由于項目的原因,需要將Flink運行在GCP上编整,因此File System自然是想使用GCS. 在網上搜了很多,由于眾所周知的原因乳丰,國內使用Google Cloud非常少掌测,資料就更少了。Flink官方文檔對這塊描述又很簡單产园,傳送門汞斧。總結下如何使用GCS作為State Backend方法如下:
- 使用Flink對HDFS的支持方式支持GCS
- 創(chuàng)建core-site.xml
因為本文使用環(huán)境是Flink standalone環(huán)境什燕,并無hfds粘勒,因此首先需要創(chuàng)建core-site.xml.
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>google.cloud.auth.service.account.enable</name>
<value>true</value>
</property>
<!-- Turn security off for tests by default -->
<property>
<name>fs.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value>/data/flink-1.9.1/conf/gcs-service-account.json</value>
</property>
<property>
<name>fs.gs.project.id</name>
<value>XXX</value>
<description>
Required. Google Cloud Project ID with access to configured GCS buckets.
</description>
</property>
- 配置flink-conf.yaml 使Flink能夠找到core-site.xml配置文件
fs.hdfs.hadoopconf: /data/flink-1.9.1/conf/
- 在程序中使用GCS作為state backend
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setFailOnCheckpointingErrors(false);
checkpointConfig.setCheckpointInterval(10000);
checkpointConfig.setMinPauseBetweenCheckpoints(5000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
"gs://flinkcheckpoint", true);
env.setStateBackend((StateBackend) rocksDBStateBackend);
- 將相關jar包放到Flink能加載的class path上
- gcs-connector-hadoop2-2.0.0.jar
- gcsio-2.0.0.jar
- google-api-client-1.30.1.jar
- google-api-client-jackson2-1.30.1.jar
- google-api-client-java6-1.30.1.jar
- google-api-services-storage-v1-rev20190624-1.30.1.jar
- google-extensions-0.4.jar
- google-http-client-1.30.1.jar
- google-http-client-jackson2-1.30.1.jar
- google-oauth-client-1.30.1.jar
- google-oauth-client-java6-1.30.1.jar
- flink-shaded-hadoop2-2.8.3-1.8.3.jar
如果程序報如下錯誤,這個時候可以check下Flink的log屎即,一般是因為少加載包了庙睡,注意看下 flink-shaded-hadoop2-2.8.3-1.8.3.jar 這個包是必須的,是否已經放到了Flink可以加載的地方技俐。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
- 創(chuàng)建并下載GCP上的相關project的json格式的service account,并將其放置在步驟1 google.cloud.auth.service.account.json.keyfile里的地址下乘陪,fs.gs.project.id配置為其project id.
- 一切就緒,Run Flink job, 在GCS的目錄下可以檢查是否有check point文件生成雕擂。