本文已授權(quán)「Flink中文社區(qū)」微信公眾號發(fā)布并標(biāo)注原創(chuàng)李根。
前言
今天本來想搞篇走讀StreamingFileSink源碼的文章校摩,但是考慮到Flink 1.11版本發(fā)布已經(jīng)有段時(shí)間了,于是就放松一下,體驗(yàn)新特性吧未桥。
與1.10版本相比笔刹,1.11版本最為顯著的一個(gè)改進(jìn)是Hive Integration顯著增強(qiáng),也就是真正意義上實(shí)現(xiàn)了基于Hive的流批一體冬耿。本文用簡單的本地示例來體驗(yàn)Hive Streaming的便利性舌菜。
添加相關(guān)依賴
測試集群上的Hive版本為1.1.0,Hadoop版本為2.6.0亦镶,Kafka版本為1.0.1日月。
<properties>
<scala.bin.version>2.11</scala.bin.version>
<flink.version>1.11.0</flink.version>
<flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version>
<hive.version>1.1.0</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>${flink-shaded-hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
另外,別忘了找到hdfs-site.xml和hive-site.xml缤骨,并將其加入項(xiàng)目爱咬。
創(chuàng)建執(zhí)行環(huán)境
Flink 1.11的Table/SQL API中,F(xiàn)ileSystem Connector是靠一個(gè)增強(qiáng)版StreamingFileSink組件實(shí)現(xiàn)绊起,在源碼中名為StreamingFileWriter精拟。我們知道,只有在checkpoint成功時(shí)虱歪,StreamingFileSink寫入的文件才會由pending狀態(tài)變成finished狀態(tài)串前,從而能夠安全地被下游讀取。所以实蔽,我們一定要打開checkpointing荡碾,并設(shè)定合理的間隔。
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
注冊HiveCatalog
val catalogName = "my_catalog"
val catalog = new HiveCatalog(
catalogName, // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)
創(chuàng)建Kafka流表
Kafka topic中存儲的是JSON格式的埋點(diǎn)日志局装,建表時(shí)用計(jì)算列生成事件時(shí)間與水印坛吁。1.11版本SQL Kafka Connector的參數(shù)相比1.10版本有一定簡化。
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
tableEnv.executeSql(
"""
|CREATE TABLE stream_tmp.analytics_access_log_kafka (
| ts BIGINT,
| userId BIGINT,
| eventType STRING,
| fromType STRING,
| columnType STRING,
| siteId BIGINT,
| grouponId BIGINT,
| partnerId BIGINT,
| merchandiseId BIGINT,
| procTime AS PROCTIME(),
| eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
| WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'ods_analytics_access_log',
| 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'
| 'properties.group.id' = 'flink_hive_integration_exp_1',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
""".stripMargin
)
前面已經(jīng)注冊了HiveCatalog铐尚,故在Hive中可以觀察到創(chuàng)建的Kafka流表的元數(shù)據(jù)(注意該表并沒有事實(shí)上的列)拨脉。
hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;
OK
# col_name data_type comment
# Detailed Table Information
Database: stream_tmp
Owner: null
CreateTime: Wed Jul 15 18:25:09 CST 2020
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector kafka
flink.format json
flink.json.fail-on-missing-field false
flink.json.ignore-parse-errors true
flink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092
flink.properties.group.id flink_hive_integration_exp_1
flink.scan.startup.mode latest-offset
flink.schema.0.data-type BIGINT
flink.schema.0.name ts
flink.schema.1.data-type BIGINT
flink.schema.1.name userId
flink.schema.10.data-type TIMESTAMP(3)
flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))
flink.schema.10.name eventTime
flink.schema.2.data-type VARCHAR(2147483647)
flink.schema.2.name eventType
# 略......
flink.schema.9.data-type TIMESTAMP(3) NOT NULL
flink.schema.9.expr PROCTIME()
flink.schema.9.name procTime
flink.schema.watermark.0.rowtime eventTime
flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECOND
flink.topic ods_analytics_access_log
is_generic true
transient_lastDdlTime 1594808709
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 1.797 seconds, Fetched: 61 row(s)
創(chuàng)建Hive表
Flink SQL提供了兼容HiveQL風(fēng)格的DDL,指定SqlDialect.HIVE
即可(DML兼容還在開發(fā)中)宣增。
為了方便觀察結(jié)果玫膀,以下的表采用了天/小時(shí)/分鐘的三級分區(qū),實(shí)際應(yīng)用中可以不用這樣細(xì)的粒度(10分鐘甚至1小時(shí)的分區(qū)可能更合適)爹脾。
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
tableEnv.executeSql(
"""
|CREATE TABLE hive_tmp.analytics_access_log_hive (
| ts BIGINT,
| user_id BIGINT,
| event_type STRING,
| from_type STRING,
| column_type STRING,
| site_id BIGINT,
| groupon_id BIGINT,
| partner_id BIGINT,
| merchandise_id BIGINT
|) PARTITIONED BY (
| ts_date STRING,
| ts_hour STRING,
| ts_minute STRING
|) STORED AS PARQUET
|TBLPROPERTIES (
| 'sink.partition-commit.trigger' = 'partition-time',
| 'sink.partition-commit.delay' = '1 min',
| 'sink.partition-commit.policy.kind' = 'metastore,success-file',
| 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'
|)
""".stripMargin
)
Hive表的參數(shù)復(fù)用了SQL FileSystem Connector的相關(guān)參數(shù)帖旨,與分區(qū)提交(partition commit)密切相關(guān)。僅就上面出現(xiàn)的4個(gè)參數(shù)簡單解釋一下灵妨。
-
sink.partition-commit.trigger
:觸發(fā)分區(qū)提交的時(shí)間特征解阅。默認(rèn)為processing-time
,即處理時(shí)間泌霍,很顯然在有延遲的情況下货抄,可能會造成數(shù)據(jù)分區(qū)錯(cuò)亂。所以這里使用partition-time
,即按照分區(qū)時(shí)間戳(即分區(qū)內(nèi)數(shù)據(jù)對應(yīng)的事件時(shí)間)來提交蟹地。 -
partition.time-extractor.timestamp-pattern
:分區(qū)時(shí)間戳的抽取格式积暖。需要寫成yyyy-MM-dd HH:mm:ss
的形式,并用Hive表中相應(yīng)的分區(qū)字段做占位符替換怪与。顯然呀酸,Hive表的分區(qū)字段值來自流表中定義好的事件時(shí)間,后面會看到琼梆。 -
sink.partition-commit.delay
:觸發(fā)分區(qū)提交的延遲。在時(shí)間特征設(shè)為partition-time
的情況下窿吩,當(dāng)水印時(shí)間戳大于分區(qū)創(chuàng)建時(shí)間加上此延遲時(shí)茎杂,分區(qū)才會真正提交。此值最好與分區(qū)粒度相同纫雁,例如若Hive表按1小時(shí)分區(qū)煌往,此參數(shù)可設(shè)為1 h
,若按10分鐘分區(qū)轧邪,可設(shè)為10 min
刽脖。 -
sink.partition-commit.policy.kind
:分區(qū)提交策略,可以理解為使分區(qū)對下游可見的附加操作忌愚。metastore
表示更新Hive Metastore中的表元數(shù)據(jù)曲管,success-file
則表示在分區(qū)內(nèi)創(chuàng)建_SUCCESS標(biāo)記文件。
當(dāng)然硕糊,SQL FileSystem Connector的功能并不限于此院水,還有很大自定義的空間(如可以自定義分區(qū)提交策略以合并小文件等)。具體可參見官方文檔简十。
流式寫入Hive
注意將流表中的事件時(shí)間轉(zhuǎn)化為Hive的分區(qū)檬某。
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(
"""
|INSERT INTO hive_tmp.analytics_access_log_hive
|SELECT
| ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,
| DATE_FORMAT(eventTime,'yyyy-MM-dd'),
| DATE_FORMAT(eventTime,'HH'),
| DATE_FORMAT(eventTime,'mm')
|FROM stream_tmp.analytics_access_log_kafka
|WHERE merchandiseId > 0
""".stripMargin
)
來觀察一下流式Sink的結(jié)果吧。
上文設(shè)定的checkpoint interval是20秒螟蝙,可以看到恢恼,上圖中的數(shù)據(jù)文件恰好是以20秒的間隔寫入的。由于并行度為3胰默,所以每次寫入會生成3個(gè)文件场斑。分區(qū)內(nèi)所有數(shù)據(jù)寫入完畢后,會同時(shí)生成_SUCCESS文件牵署。如果是正在寫入的分區(qū)和簸,則會看到.inprogress文件。
通過Hive查詢一下碟刺,確定數(shù)據(jù)的時(shí)間無誤锁保。
hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))
> FROM hive_tmp.analytics_access_log_hive
> WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';
OK
2020-07-15 23:23:00 2020-07-15 23:23:59
Time taken: 1.115 seconds, Fetched: 1 row(s)
流式讀取Hive
要將Hive表作為流式Source,需要啟用dynamic table options,并通過table hints來指定Hive數(shù)據(jù)流的參數(shù)爽柒。以下是簡單地通過Hive計(jì)算商品PV的例子吴菠。
tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery(
"""
|SELECT merchandise_id,count(1) AS pv
|FROM hive_tmp.analytics_access_log_hive
|/*+ OPTIONS(
| 'streaming-source.enable' = 'true',
| 'streaming-source.monitor-interval' = '1 min',
| 'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
|) */
|WHERE event_type = 'shtOpenGoodsDetail'
|AND ts_date >= '2020-07-15'
|GROUP BY merchandise_id
|ORDER BY pv DESC LIMIT 10
""".stripMargin
)
result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()
三個(gè)table hint參數(shù)的含義解釋如下。
-
streaming-source.enable
:設(shè)為true浩村,表示該Hive表可以作為Source做葵。 -
streaming-source.monitor-interval
:感知Hive表新增數(shù)據(jù)的周期,以上設(shè)為1分鐘心墅。對于分區(qū)表而言酿矢,則是監(jiān)控新分區(qū)的生成,以增量讀取數(shù)據(jù)怎燥。 -
streaming-source.consume-start-offset
:開始消費(fèi)的時(shí)間戳瘫筐,同樣需要寫成yyyy-MM-dd HH:mm:ss
的形式。
更加具體的說明仍然可參見官方文檔(吐槽一句铐姚,這份文檔的Chinglish味道真的太重了=策肝。=
最后,由于SQL語句中有ORDER BY和LIMIT邏輯隐绵,所以需要調(diào)用toRetractStream()方法轉(zhuǎn)化為回撤流之众,即可輸出結(jié)果。
The End
Flink 1.11的Hive Streaming功能大大提高了Hive數(shù)倉的實(shí)時(shí)性依许,對ETL作業(yè)非常有利棺禾,同時(shí)還能夠滿足流式持續(xù)查詢的需求,具有一定的靈活性峭跳。
還有事情要做帘睦,民那晚安。