摘要:本文所介紹 Nebula Graph 連接器 Nebula Flink Connector申鱼,采用類似 Flink 提供的 Flink Connector 形式英妓,支持 Flink 讀寫分布式圖數(shù)據(jù)庫 Nebula Graph。
文章首發(fā) Nebula Graph 官網(wǎng)博客:https://nebula-graph.com.cn/posts/nebula-flink-connector/
在關(guān)系網(wǎng)絡(luò)分析酒请、關(guān)系建模骡技、實時推薦等場景中應(yīng)用圖數(shù)據(jù)庫作為后臺數(shù)據(jù)支撐已相對普及,且部分應(yīng)用場景對圖數(shù)據(jù)的實時性要求較高羞反,如推薦系統(tǒng)哮兰、搜索引擎。為了提升數(shù)據(jù)的實時性,業(yè)界廣泛應(yīng)用流式計算對更新的數(shù)據(jù)進行增量實時處理。為了支持對圖數(shù)據(jù)的流式計算宋梧,Nebula Graph 團隊開發(fā)了 Nebula Flink Connector介牙,支持利用 Flink 進行 Nebula Graph 圖數(shù)據(jù)的流式處理和計算。
Flink 是新一代流批統(tǒng)一的計算引擎,它從不同的第三方存儲引擎中讀取數(shù)據(jù)最冰,并進行處理分扎,再寫入另外的存儲引擎中窘哈。Flink Connector 的作用就相當于一個連接器吹榴,連接 Flink 計算引擎跟外界存儲系統(tǒng)。
與外界進行數(shù)據(jù)交換時滚婉,F(xiàn)link 支持以下 4 種方式:
- Flink 源碼內(nèi)部預定義 Source 和 Sink 的 API图筹;
- Flink 內(nèi)部提供了 Bundled Connectors,如 JDBC Connector让腹。
- Apache Bahir 項目中提供連接器
Apache Bahir 最初是從 Apache Spark 中獨立出來的項目远剩,以提供不限于 Spark 相關(guān)的擴展/插件、連接器和其他可插入組件的實現(xiàn)骇窍。 - 通過異步 I/O 方式瓜晤。
流計算中經(jīng)常需要與外部存儲系統(tǒng)交互,比如需要關(guān)聯(lián) MySQL 中的某個表腹纳。一般來說痢掠,如果用同步 I/O 的方式,會造成系統(tǒng)中出現(xiàn)大的等待時間嘲恍,影響吞吐和延遲足画。異步 I/O 則可以并發(fā)處理多個請求,提高吞吐佃牛,減少延遲淹辞。
本文所介紹 Nebula Graph 連接器 Nebula Flink Connector,采用類似 Flink 提供的 Flink Connector 形式吁脱,支持 Flink 讀寫分布式圖數(shù)據(jù)庫 Nebula Graph桑涎。
一彬向、Connector Source
Flink 作為一款流式計算框架兼贡,它可處理有界數(shù)據(jù),也可處理無界數(shù)據(jù)娃胆。所謂無界遍希,即源源不斷的數(shù)據(jù),不會有終止里烦,實時流處理所處理的數(shù)據(jù)便是無界數(shù)據(jù)凿蒜;批處理的數(shù)據(jù),即有界數(shù)據(jù)胁黑。而 Source 便是 Flink 處理數(shù)據(jù)的數(shù)據(jù)來源废封。
Nebula Flink Connector 中的 Source 便是圖數(shù)據(jù)庫 Nebula Graph。Flink 提供了豐富的 Connector 組件允許用戶自定義數(shù)據(jù)源來連接外部數(shù)據(jù)存儲系統(tǒng)丧蘸。
1.1 Source 簡介
Flink 的 Source 主要負責外部數(shù)據(jù)源的接入漂洋,F(xiàn)link 的 Source 能力主要是通過 read 相關(guān)的 API 和 addSource 方法這 2 種方式來實現(xiàn)數(shù)據(jù)源的讀取,使用 addSource 方法對接外部數(shù)據(jù)源時,可以使用 Flink Bundled Connector刽漂,也可以自定義 Source演训。
Flink Source 的幾種使用方式如下:
本章主要介紹如何通過自定義 Source 方式實現(xiàn) Nebula Graph Source。
1.2 自定義 Source
在 Flink 中可以使用 StreamExecutionEnvironment.addSource(sourceFunction)
和 ExecutionEnvironment.createInput(inputFormat)
兩種方式來為你的程序添加數(shù)據(jù)來源贝咙。
Flink 已經(jīng)提供多個內(nèi)置的 source functions
样悟,開發(fā)者可以通過繼承 RichSourceFunction
來自定義非并行的 source
,通過繼承 RichParallelSourceFunction
來自定義并行的 Source
庭猩。RichSourceFunction
和 RichParallelSourceFunction
是 SourceFunction
和 RichFunction
特性的結(jié)合窟她。 其中SourceFunction
負責數(shù)據(jù)的生成, RichFunction
負責資源的管理眯娱。當然礁苗,也可以只實現(xiàn) SourceFunction
接口來定義最簡單的只具備獲取數(shù)據(jù)功能的 dataSource
。
通常自定義一個完善的 Source 節(jié)點是通過實現(xiàn) RichSourceFunction
類來完成的徙缴,該類兼具 RichFunction
和 SourceFunction
的能力试伙,因此自定義 Flink 的 Nebula Graph Source 功能我們需要實現(xiàn) RichSourceFunction
中提供的方法。
1.3 自定義 Nebula Graph Source 實現(xiàn)原理
Nebula Flink Connector 中實現(xiàn)的自定義 Nebula Graph Source 數(shù)據(jù)源提供了兩種使用方式于样,分別是 addSource 和 createInput 方式疏叨。
Nebula Graph Source 實現(xiàn)類圖如下:
(1)addSource
該方式是通過 NebulaSourceFunction 類實現(xiàn)的,該類繼承自 RichSourceFunction 并實現(xiàn)了以下方法:
- open
準備 Nebula Graph 連接信息穿剖,并獲取 Nebula Graph Meta 服務(wù)和 Storage 服務(wù)的連接蚤蔓。 - close
數(shù)據(jù)讀取完成,釋放資源糊余。關(guān)閉 Nebula Graph 服務(wù)的連接秀又。 - run
開始讀取數(shù)據(jù),并將數(shù)據(jù)填充到 sourceContext贬芥。 - cancel
取消 Flink 作業(yè)時調(diào)用吐辙,關(guān)閉資源。
(2)createInput
該方式是通過 NebulaInputFormat 類實現(xiàn)的蘸劈,該類繼承自 RichInputFormat 并實現(xiàn)了以下方法:
- openInputFormat
準備 inputFormat昏苏,獲取連接。 - closeInputFormat
數(shù)據(jù)讀取完成威沫,釋放資源贤惯,關(guān)閉 Nebula Graph 服務(wù)的連接。 - getStatistics
獲取數(shù)據(jù)源的基本統(tǒng)計信息棒掠。 - createInputSplits
基于配置的 partition 參數(shù)創(chuàng)建 GenericInputSplit孵构。 - getInputSplitAssigner
返回輸入的 split 分配器,按原始計算的順序返回 Source 的所有 split烟很。 - open
開始 inputFormat 的數(shù)據(jù)讀取颈墅,將讀取的數(shù)據(jù)轉(zhuǎn)換 Flink 的數(shù)據(jù)格式棒假,構(gòu)造迭代器。 - close
數(shù)據(jù)讀取完成精盅,打印讀取日志帽哑。 - reachedEnd
是否讀取完成 - nextRecord
通過迭代器獲取下一條數(shù)據(jù)
通過 addSource 讀取 Source 數(shù)據(jù)得到的是 Flink 的 DataStreamSource,表示 DataStream 的起點叹俏。
通過 createInput 讀取數(shù)據(jù)得到的是 Flink 的 DataSource妻枕,DataSource 是一個創(chuàng)建新數(shù)據(jù)集的 Operator,這個 Operator 可作為進一步轉(zhuǎn)換的數(shù)據(jù)集粘驰。DataSource 可以通過 withParameters 封裝配置參數(shù)進行其他的操作屡谐。
1.4 自定義 Nebula Graph Source 應(yīng)用實踐
使用 Flink 讀取 Nebula Graph 圖數(shù)據(jù)時,需要構(gòu)造 NebulaSourceFunction 和 NebulaOutputFormat蝌数,并通過 Flink 的 addSource 或 createInput 方法注冊數(shù)據(jù)源進行 Nebula Graph 數(shù)據(jù)讀取愕掏。
構(gòu)造 NebulaSourceFunction 和 NebulaOutputFormat 時需要進行客戶端參數(shù)的配置和執(zhí)行參數(shù)的配置,說明如下:
配置項說明:
- NebulaClientOptions
- 配置 address顶伞,NebulaSource 需要配置 Nebula Graph Metad 服務(wù)的地址饵撑。
- 配置 username
- 配置 password
- VertexExecutionOptions
- 配置 GraphSpace
- 配置要讀取的 tag
- 配置要讀取的字段集
- 配置是否讀取所有字段,默認為 false唆貌, 若配置為 true 則字段集配置無效
- 配置每次讀取的數(shù)據(jù)量 limit滑潘,默認 2000
- EdgeExecutionOptions
- 配置 GraphSpace
- 配置要讀取的 edge
- 配置要讀取的字段集
- 配置是否讀取所有字段,默認為 false锨咙, 若配置為 true 則字段集配置無效
- 配置每次讀取的數(shù)據(jù)量 limit语卤,默認 2000
// 構(gòu)造 Nebula Graph 客戶端連接需要的參數(shù)
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
.NebulaClientOptionsBuilder()
.setAddress("127.0.0.1:45500")
.build();
// 創(chuàng)建 connectionProvider
NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
// 構(gòu)造 Nebula Graph 數(shù)據(jù)讀取需要的參數(shù)
List<String> cols = Arrays.asList("name", "age");
VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSource")
.setTag(tag)
.setFields(cols)
.setLimit(100)
.builder();
// 構(gòu)造 NebulaInputFormat
NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
.setExecutionOptions(sourceExecutionOptions);
// 方式 1 使用 createInput 方式注冊 Nebula Graph 數(shù)據(jù)源
DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
.createInput(inputFormat);
// 方式 2 使用 addSource 方式注冊 Nebula Graph 數(shù)據(jù)源
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
.setExecutionOptions(sourceExecutionOptions);
DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment()
.addSource(sourceFunction);
Nebula Source Demo 編寫完成后可以打包提交到 Flink 集群執(zhí)行。
示例程序讀取 Nebula Graph 的點數(shù)據(jù)并打印酪刀,該作業(yè)以 Nebula Graph 作為 Source粹舵,以 print 作為 Sink,執(zhí)行結(jié)果如下:
Source sent 數(shù)據(jù)為 59,671,064 條骂倘,Sink received 數(shù)據(jù)為 59,671,064 條眼滤。
二、Connector Sink
Nebula Flink Connector 中的 Sink 即 Nebula Graph 圖數(shù)據(jù)庫稠茂。Flink 提供了豐富的 Connector 組件允許用戶自定義數(shù)據(jù)池來接收 Flink 所處理的數(shù)據(jù)流柠偶。
2.1 Sink 簡介
Sink 是 Flink 處理完 Source 后數(shù)據(jù)的輸出情妖,主要負責實時計算結(jié)果的輸出和持久化睬关。比如:將數(shù)據(jù)流寫入標準輸出、寫入文件毡证、寫入 Sockets电爹、寫入外部系統(tǒng)等。
Flink 的 Sink 能力主要是通過調(diào)用數(shù)據(jù)流的 write 相關(guān) API 和 DataStream.addSink 兩種方式來實現(xiàn)數(shù)據(jù)流的外部存儲料睛。
類似于 Flink Connector 的 Source丐箩,Sink 也允許用戶自定義來支持豐富的外部數(shù)據(jù)系統(tǒng)作為 Flink 的數(shù)據(jù)池摇邦。
Flink Sink 的使用方式如下:
本章主要介紹如何通過自定義 Sink 的方式實現(xiàn) Nebula Graph Sink。
2.2 自定義 Sink
在 Flink 中可以使用 DataStream.addSink
和 DataStream.writeUsingOutputFormat
的方式將 Flink 數(shù)據(jù)流寫入外部自定義數(shù)據(jù)池屎勘。
Flink 已經(jīng)提供了若干實現(xiàn)好了的 Sink Functions
施籍,也可以通過實現(xiàn) SinkFunction
以及繼承 RichOutputFormat
來實現(xiàn)自定義的 Sink。
2.3 自定義 Nebula Graph Sink 實現(xiàn)原理
Nebula Flink Connector 中實現(xiàn)了自定義的 NebulaSinkFunction概漱,開發(fā)者通過調(diào)用 DataSource.addSink 方法并將 NebulaSinkFunction 對象作為參數(shù)傳入即可實現(xiàn)將 Flink 數(shù)據(jù)流寫入 Nebula Graph丑慎。
Nebula Flink Connector 使用的是 Flink 的 1.11-SNAPSHOT 版本,該版本中已經(jīng)廢棄了使用 writeUsingOutputFormat 方法來定義輸出端的接口瓤摧。
源碼如下竿裂,所以請注意在使用自定義 Nebula Graph Sink 時請采用 DataStream.addSink 的方式。
/** @deprecated */
@Deprecated
@PublicEvolving
public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
return this.addSink(new OutputFormatSinkFunction(format));
}
Nebula Graph Sink 實現(xiàn)類圖如下:
其中最重要的兩個類是 NebulaSinkFunction 和 NebulaBatchOutputFormat照弥。
NebulaSinkFunction 繼承自 AbstractRichFunction 并實現(xiàn)了以下方法:
- open
調(diào)用 NebulaBatchOutputFormat 的 open 方法腻异,進行資源準備。 - close
調(diào)用 NebulaBatchOutputFormat 的 close 方法这揣,進行資源釋放悔常。 - invoke
是 Sink 中的核心方法, 調(diào)用 NebulaBatchOutputFormat 中的 write 方法進行數(shù)據(jù)寫入给赞。 - flush
調(diào)用 NebulaBatchOutputFormat 的 flush 方法進行數(shù)據(jù)的提交这嚣。
NebulaBatchOutputFormat 繼承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 繼承自 RichOutputFormat塞俱,主要實現(xiàn)的方法有:
- open
準備圖數(shù)據(jù)庫 Nebula Graph 的 Graphd 服務(wù)的連接姐帚,并初始化數(shù)據(jù)寫入執(zhí)行器 nebulaBatchExecutor - close
提交最后批次數(shù)據(jù),等待最后提交的回調(diào)結(jié)果并關(guān)閉服務(wù)連接等資源障涯。 - writeRecord
核心方法罐旗,將數(shù)據(jù)寫入 nebulaBufferedRow 中,并在達到配置的批量寫入 Nebula Graph 上限時提交寫入唯蝶。Nebula Graph Sink 的寫入操作是異步的九秀,所以需要執(zhí)行回調(diào)來獲取執(zhí)行結(jié)果。 - flush
當 bufferRow 存在數(shù)據(jù)時粘我,將數(shù)據(jù)提交到 Nebula Graph 中鼓蜒。
在 AbstractNebulaOutputFormat 中調(diào)用了 NebulaBatchExecutor 進行數(shù)據(jù)的批量管理和批量提交,并通過定義回調(diào)函數(shù)接收批量提交的結(jié)果征字,代碼如下:
/**
* write one record to buffer
*/
@Override
public final synchronized void writeRecord(T row) throws IOException {
nebulaBatchExecutor.addToBatch(row);
if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
commit();
}
}
/**
* put record into buffer
*
* @param record represent vertex or edge
*/
void addToBatch(T record) {
boolean isVertex = executionOptions.getDataType().isVertex();
NebulaOutputFormatConverter converter;
if (isVertex) {
converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions);
} else {
converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions);
}
String value = converter.createValue(record, executionOptions.getPolicy());
if (value == null) {
return;
}
nebulaBufferedRow.putRow(value);
}
/**
* commit batch insert statements
*/
private synchronized void commit() throws IOException {
graphClient.switchSpace(executionOptions.getGraphSpace());
future = nebulaBatchExecutor.executeBatch(graphClient);
// clear waiting rows
numPendingRow.compareAndSet(executionOptions.getBatch(),0);
}
/**
* execute the insert statement
*
* @param client Asynchronous graph client
*/
ListenableFuture executeBatch(AsyncGraphClientImpl client) {
String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
// construct insert statement
String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
// execute insert statement
ListenableFuture<Optional<Integer>> execResult = client.execute(exec);
// define callback function
Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() {
@Override
public void onSuccess(Optional<Integer> integerOptional) {
if (integerOptional.isPresent()) {
if (integerOptional.get() == ErrorCode.SUCCEEDED) {
LOG.info("batch insert Succeed");
} else {
LOG.error(String.format("batch insert Error: %d",
integerOptional.get()));
}
} else {
LOG.error("batch insert Error");
}
}
@Override
public void onFailure(Throwable throwable) {
LOG.error("batch insert Error");
}
});
nebulaBufferedRow.clean();
return execResult;
}
由于 Nebula Graph Sink 的寫入是批量都弹、異步的,所以在最后業(yè)務(wù)結(jié)束 close 資源之前需要將緩存中的批量數(shù)據(jù)提交且等待寫入操作的完成匙姜,以防在寫入提交之前提前把 Nebula Graph Client 關(guān)閉畅厢,代碼如下:
/**
* commit the batch write operator before release connection
*/
@Override
public final synchronized void close() throws IOException {
if(numPendingRow.get() > 0){
commit();
}
while(!future.isDone()){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.error("sleep interrupted, ", e);
}
}
super.close();
}
2.4 自定義 Nebula Graph Sink 應(yīng)用實踐
Flink 將處理完成的數(shù)據(jù) Sink 到 Nebula Graph 時,需要將 Flink 數(shù)據(jù)流進行 map 轉(zhuǎn)換成 Nebula Graph Sink 可接收的數(shù)據(jù)格式氮昧。自定義 Nebula Graph Sink 的使用方式是通過 addSink 形式框杜,將 NebulaSinkFunction 作為參數(shù)傳給 addSink 方法來實現(xiàn) Flink 數(shù)據(jù)流的寫入浦楣。
- NebulaClientOptions
- 配置 address,NebulaSource 需要配置 Nebula Graph Graphd 服務(wù)的地址咪辱。
- 配置 username
- 配置 password
- VertexExecutionOptions
- 配置 GraphSpace
- 配置要寫入的 tag
- 配置要寫入的字段集
- 配置寫入的點 ID 所在 Flink 數(shù)據(jù)流 Row 中的索引
- 配置批量寫入 Nebula Graph 的數(shù)量振劳,默認 2000
- EdgeExecutionOptions
- 配置 GraphSpace
- 配置要寫入的 edge
- 配置要寫入的字段集
- 配置寫入的邊 src-id 所在 Flink 數(shù)據(jù)流 Row 中的索引
- 配置寫入的邊 dst-id 所在 Flink 數(shù)據(jù)流 Row 中的索引
- 配置寫入的邊 rank 所在 Flink 數(shù)據(jù)流 Row 中的索引,不配則無 rank
- 配置批量寫入 Nebula Graph 的數(shù)量油狂,默認 2000
/// 構(gòu)造 Nebula Graphd 客戶端連接需要的參數(shù)
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
.NebulaClientOptionsBuilder()
.setAddress("127.0.0.1:3699")
.build();
NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
// 構(gòu)造 Nebula Graph 寫入操作參數(shù)
List<String> cols = Arrays.asList("name", "age")
ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag(tag)
.setFields(cols)
.setIdIndex(0)
.setBatch(20)
.builder();
// 寫入 Nebula Graph
dataSource.addSink(nebulaSinkFunction);
Nebula Graph Sink 的 Demo 程序以 Nebula Graph 的 space:flinkSource 作為 Source 讀取數(shù)據(jù)澎迎,進行 map 類型轉(zhuǎn)換后 Sink 入 Nebula Graph 的 space:flinkSink,對應(yīng)的應(yīng)用場景為將 Nebula Graph 中一個 space 的數(shù)據(jù)流入另一個 space 中选调。
三夹供、 Catalog
Flink 1.11.0 之前,用戶如果依賴 Flink 的 Source/Sink 讀寫外部數(shù)據(jù)源時仁堪,必須要手動讀取對應(yīng)數(shù)據(jù)系統(tǒng)的 Schema哮洽。比如,要讀寫 Nebula Graph弦聂,則必須先保證明確地知曉在 Nebula Graph 中的 Schema 信息鸟辅。但是這樣會有一個問題,當 Nebula Graph 中的 Schema 發(fā)生變化時莺葫,也需要手動更新對應(yīng)的 Flink 任務(wù)以保持類型匹配匪凉,任何不匹配都會造成運行時報錯使作業(yè)失敗。這個操作冗余且繁瑣捺檬,體驗極差再层。
1.11.0 版本后,用戶使用 Flink Connector 時可以自動獲取表的 Schema堡纬∧羰埽可以在不了解外部系統(tǒng)數(shù)據(jù) Schema 的情況下進行數(shù)據(jù)匹配。
目前 Nebula Flink Connector 中已支持數(shù)據(jù)的讀寫烤镐,要實現(xiàn) Schema 的匹配則需要為 Flink Connector 實現(xiàn) Catalog 的管理蛋济。但為了確保 Nebula Graph 中數(shù)據(jù)的安全性,Nebula Flink Connector 只支持 Catalog 的讀操作炮叶,不允許進行 Catalog 的修改和寫入碗旅。
訪問 Nebula Graph 指定類型的數(shù)據(jù)時,完整路徑應(yīng)該是以下格式:<graphSpace>.<VERTEX.tag>
或者 <graphSpace>.<EDGE.edge>
具體使用方式如下:
String catalogName = "testCatalog";
String defaultSpace = "flinkSink";
String username = "root";
String password = "nebula";
String address = "127.0.0.1:45500";
String table = "VERTEX.player"
// define Nebula catalog
Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password);
// define Flink table environment
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(bsEnv);
// register customed nebula catalog
tEnv.registerCatalog(catalogName, catalog);
// use customed nebula catalog
tEnv.useCatalog(catalogName);
// show graph spaces of Nebula Graph
String[] spaces = tEnv.listDatabases();
// show tags and edges of Nebula Graph
tEnv.useDatabase(defaultSpace);
String[] tables = tEnv.listTables();
// check tag player exist in defaultSpace
ObjectPath path = new ObjectPath(defaultSpace, table);
assert catalog.tableExists(path) == true
// get nebula tag schema
CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table));
table.getSchema();
Nebula Flink Connector 支持的其他 Catalog 接口請查看 GitHub 代碼 NebulaCatalog.java镜悉。
四祟辟、 Exactly-once
Flink Connector 的 Exactly-once 是指 Flink 借助于 checkpoint 機制保證每個輸入事件只對最終結(jié)果影響一次,在數(shù)據(jù)處理過程中即使出現(xiàn)故障积瞒,也不會存在數(shù)據(jù)重復和丟失的情況川尖。
為了提供端到端的 Exactly-once 語義登下,F(xiàn)link 的外部數(shù)據(jù)系統(tǒng)也必須提供提交或回滾的方法茫孔,然后通過 Flink 的 checkpoint 機制協(xié)調(diào)叮喳。Flink 提供了實現(xiàn)端到端的 Exactly-once 的抽象,即實現(xiàn)二階段提交的抽象類 TwoPhaseCommitSinkFunction缰贝。
想為數(shù)據(jù)輸出端實現(xiàn) Exactly-once馍悟,則需要實現(xiàn)四個函數(shù):
- beginTransaction
在事務(wù)開始前,在目標文件系統(tǒng)的臨時目錄創(chuàng)建一個臨時文件剩晴,隨后可以在數(shù)據(jù)處理時將數(shù)據(jù)寫入此文件锣咒。 - preCommit
在預提交階段,關(guān)閉文件不再寫入赞弥。為下一個 checkpoint 的任何后續(xù)文件寫入啟動一個新事務(wù)毅整。 - commit
在提交階段,將預提交階段的文件原子地移動到真正的目標目錄绽左。二階段提交過程會增加輸出數(shù)據(jù)可見性的延遲悼嫉。 - abort
在終止階段,刪除臨時文件拼窥。
根據(jù)上述函數(shù)可看出戏蔑,F(xiàn)link 的二階段提交對外部數(shù)據(jù)源有要求,即 Source 數(shù)據(jù)源必須具備重發(fā)功能鲁纠,Sink 數(shù)據(jù)池必須支持事務(wù)提交和冪等寫总棵。
Nebula Graph v1.1.0 雖然不支持事務(wù),但其寫入操作是冪等的改含,即同一條數(shù)據(jù)的多次寫入結(jié)果是一致的情龄。因此可以通過 checkpoint 機制實現(xiàn) Nebula Flink Connector 的 At-least-Once 機制,根據(jù)多次寫入的冪等性可以間接實現(xiàn) Sink 的 Exactly-once捍壤。
要使用 Nebula Graph Sink 的容錯性刃唤,請確保在 Flink 的執(zhí)行環(huán)境中開啟了 checkpoint 配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000) // checkpoint every 10000 msecs
.getCheckpointConfig()
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
Reference
- Nebula Source Demo [testNebulaSource]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
- Nebula Sink Demo [testSourceSink]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
- Apache Flink 源碼:https://github.com/apache/flink
- ApacheFlink 零基礎(chǔ)入門:https://www.infoq.cn/theme/28
- Flink 文檔:https://flink.apache.org/flink-architecture.html
- Flink 實踐文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.12/
- flink-connector-jdbc 源碼:https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc
- Flink JDBC Catalog 詳解:https://cloud.tencent.com/developer/article/1697913
喜歡這篇文章?來來來白群,給我們的 GitHub 點個 star 表鼓勵啦~~ ???♂????♀? [手動跪謝]
交流圖數(shù)據(jù)庫技術(shù)尚胞?交個朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你進交流群~~