Flink 1.11新特性之SQL Hive Streaming簡單示例

本文已授權(quán)「Flink中文社區(qū)」微信公眾號發(fā)布并標(biāo)注原創(chuàng)李根。

前言

今天本來想搞篇走讀StreamingFileSink源碼的文章校摩,但是考慮到Flink 1.11版本發(fā)布已經(jīng)有段時(shí)間了,于是就放松一下,體驗(yàn)新特性吧未桥。

與1.10版本相比笔刹,1.11版本最為顯著的一個(gè)改進(jìn)是Hive Integration顯著增強(qiáng),也就是真正意義上實(shí)現(xiàn)了基于Hive的流批一體冬耿。本文用簡單的本地示例來體驗(yàn)Hive Streaming的便利性舌菜。

添加相關(guān)依賴

測試集群上的Hive版本為1.1.0,Hadoop版本為2.6.0亦镶,Kafka版本為1.0.1日月。

<properties>
  <scala.bin.version>2.11</scala.bin.version>
  <flink.version>1.11.0</flink.version>
  <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version>
  <hive.version>1.1.0</hive.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    <version>${flink-shaded-hadoop.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
  </dependency>

另外,別忘了找到hdfs-site.xml和hive-site.xml缤骨,并將其加入項(xiàng)目爱咬。

創(chuàng)建執(zhí)行環(huán)境

Flink 1.11的Table/SQL API中,F(xiàn)ileSystem Connector是靠一個(gè)增強(qiáng)版StreamingFileSink組件實(shí)現(xiàn)绊起,在源碼中名為StreamingFileWriter精拟。我們知道,只有在checkpoint成功時(shí)虱歪,StreamingFileSink寫入的文件才會由pending狀態(tài)變成finished狀態(tài)串前,從而能夠安全地被下游讀取。所以实蔽,我們一定要打開checkpointing荡碾,并設(shè)定合理的間隔。

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

注冊HiveCatalog

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,              // catalog name
  "default",                // default database
  "/Users/lmagic/develop",  // Hive config (hive-site.xml) directory
  "1.1.0"                   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

創(chuàng)建Kafka流表

Kafka topic中存儲的是JSON格式的埋點(diǎn)日志局装,建表時(shí)用計(jì)算列生成事件時(shí)間與水印坛吁。1.11版本SQL Kafka Connector的參數(shù)相比1.10版本有一定簡化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")

tableEnv.executeSql(
  """
    |CREATE TABLE stream_tmp.analytics_access_log_kafka (
    |  ts BIGINT,
    |  userId BIGINT,
    |  eventType STRING,
    |  fromType STRING,
    |  columnType STRING,
    |  siteId BIGINT,
    |  grouponId BIGINT,
    |  partnerId BIGINT,
    |  merchandiseId BIGINT,
    |  procTime AS PROCTIME(),
    |  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
    |  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
    |) WITH (
    |  'connector' = 'kafka',
    |  'topic' = 'ods_analytics_access_log',
    |  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'
    |  'properties.group.id' = 'flink_hive_integration_exp_1',
    |  'scan.startup.mode' = 'latest-offset',
    |  'format' = 'json',
    |  'json.fail-on-missing-field' = 'false',
    |  'json.ignore-parse-errors' = 'true'
    |)
  """.stripMargin
)

前面已經(jīng)注冊了HiveCatalog铐尚,故在Hive中可以觀察到創(chuàng)建的Kafka流表的元數(shù)據(jù)(注意該表并沒有事實(shí)上的列)拨脉。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;
OK
# col_name              data_type               comment


# Detailed Table Information
Database:               stream_tmp
Owner:                  null
CreateTime:             Wed Jul 15 18:25:09 CST 2020
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka
Table Type:             MANAGED_TABLE
Table Parameters:
    flink.connector         kafka
    flink.format            json
    flink.json.fail-on-missing-field    false
    flink.json.ignore-parse-errors  true
    flink.properties.bootstrap.servers  kafka110:9092,kafka111:9092,kafka112:9092
    flink.properties.group.id   flink_hive_integration_exp_1
    flink.scan.startup.mode latest-offset
    flink.schema.0.data-type    BIGINT
    flink.schema.0.name     ts
    flink.schema.1.data-type    BIGINT
    flink.schema.1.name     userId
    flink.schema.10.data-type   TIMESTAMP(3)
    flink.schema.10.expr    TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))
    flink.schema.10.name    eventTime
    flink.schema.2.data-type    VARCHAR(2147483647)
    flink.schema.2.name     eventType
    # 略......
    flink.schema.9.data-type    TIMESTAMP(3) NOT NULL
    flink.schema.9.expr     PROCTIME()
    flink.schema.9.name     procTime
    flink.schema.watermark.0.rowtime    eventTime
    flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
    flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '15' SECOND
    flink.topic             ods_analytics_access_log
    is_generic              true
    transient_lastDdlTime   1594808709

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
    serialization.format    1
Time taken: 1.797 seconds, Fetched: 61 row(s)

創(chuàng)建Hive表

Flink SQL提供了兼容HiveQL風(fēng)格的DDL,指定SqlDialect.HIVE即可(DML兼容還在開發(fā)中)宣增。

為了方便觀察結(jié)果玫膀,以下的表采用了天/小時(shí)/分鐘的三級分區(qū),實(shí)際應(yīng)用中可以不用這樣細(xì)的粒度(10分鐘甚至1小時(shí)的分區(qū)可能更合適)爹脾。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")

tableEnv.executeSql(
  """
    |CREATE TABLE hive_tmp.analytics_access_log_hive (
    |  ts BIGINT,
    |  user_id BIGINT,
    |  event_type STRING,
    |  from_type STRING,
    |  column_type STRING,
    |  site_id BIGINT,
    |  groupon_id BIGINT,
    |  partner_id BIGINT,
    |  merchandise_id BIGINT
    |) PARTITIONED BY (
    |  ts_date STRING,
    |  ts_hour STRING,
    |  ts_minute STRING
    |) STORED AS PARQUET
    |TBLPROPERTIES (
    |  'sink.partition-commit.trigger' = 'partition-time',
    |  'sink.partition-commit.delay' = '1 min',
    |  'sink.partition-commit.policy.kind' = 'metastore,success-file',
    |  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'
    |)
  """.stripMargin
)

Hive表的參數(shù)復(fù)用了SQL FileSystem Connector的相關(guān)參數(shù)帖旨,與分區(qū)提交(partition commit)密切相關(guān)。僅就上面出現(xiàn)的4個(gè)參數(shù)簡單解釋一下灵妨。

  • sink.partition-commit.trigger:觸發(fā)分區(qū)提交的時(shí)間特征解阅。默認(rèn)為processing-time,即處理時(shí)間泌霍,很顯然在有延遲的情況下货抄,可能會造成數(shù)據(jù)分區(qū)錯(cuò)亂。所以這里使用partition-time,即按照分區(qū)時(shí)間戳(即分區(qū)內(nèi)數(shù)據(jù)對應(yīng)的事件時(shí)間)來提交蟹地。
  • partition.time-extractor.timestamp-pattern:分區(qū)時(shí)間戳的抽取格式积暖。需要寫成yyyy-MM-dd HH:mm:ss的形式,并用Hive表中相應(yīng)的分區(qū)字段做占位符替換怪与。顯然呀酸,Hive表的分區(qū)字段值來自流表中定義好的事件時(shí)間,后面會看到琼梆。
  • sink.partition-commit.delay:觸發(fā)分區(qū)提交的延遲。在時(shí)間特征設(shè)為partition-time的情況下窿吩,當(dāng)水印時(shí)間戳大于分區(qū)創(chuàng)建時(shí)間加上此延遲時(shí)茎杂,分區(qū)才會真正提交。此值最好與分區(qū)粒度相同纫雁,例如若Hive表按1小時(shí)分區(qū)煌往,此參數(shù)可設(shè)為1 h,若按10分鐘分區(qū)轧邪,可設(shè)為10 min刽脖。
  • sink.partition-commit.policy.kind:分區(qū)提交策略,可以理解為使分區(qū)對下游可見的附加操作忌愚。metastore表示更新Hive Metastore中的表元數(shù)據(jù)曲管,success-file則表示在分區(qū)內(nèi)創(chuàng)建_SUCCESS標(biāo)記文件。

當(dāng)然硕糊,SQL FileSystem Connector的功能并不限于此院水,還有很大自定義的空間(如可以自定義分區(qū)提交策略以合并小文件等)。具體可參見官方文檔简十。

流式寫入Hive

注意將流表中的事件時(shí)間轉(zhuǎn)化為Hive的分區(qū)檬某。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(
  """
    |INSERT INTO hive_tmp.analytics_access_log_hive
    |SELECT
    |  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,
    |  DATE_FORMAT(eventTime,'yyyy-MM-dd'),
    |  DATE_FORMAT(eventTime,'HH'),
    |  DATE_FORMAT(eventTime,'mm')
    |FROM stream_tmp.analytics_access_log_kafka
    |WHERE merchandiseId > 0
  """.stripMargin
)

來觀察一下流式Sink的結(jié)果吧。

上文設(shè)定的checkpoint interval是20秒螟蝙,可以看到恢恼,上圖中的數(shù)據(jù)文件恰好是以20秒的間隔寫入的。由于并行度為3胰默,所以每次寫入會生成3個(gè)文件场斑。分區(qū)內(nèi)所有數(shù)據(jù)寫入完畢后,會同時(shí)生成_SUCCESS文件牵署。如果是正在寫入的分區(qū)和簸,則會看到.inprogress文件。

通過Hive查詢一下碟刺,確定數(shù)據(jù)的時(shí)間無誤锁保。

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))
    > FROM hive_tmp.analytics_access_log_hive
    > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';
OK
2020-07-15 23:23:00 2020-07-15 23:23:59
Time taken: 1.115 seconds, Fetched: 1 row(s)

流式讀取Hive

要將Hive表作為流式Source,需要啟用dynamic table options,并通過table hints來指定Hive數(shù)據(jù)流的參數(shù)爽柒。以下是簡單地通過Hive計(jì)算商品PV的例子吴菠。

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)

val result = tableEnv.sqlQuery(
  """
     |SELECT merchandise_id,count(1) AS pv
     |FROM hive_tmp.analytics_access_log_hive
     |/*+ OPTIONS(
     |  'streaming-source.enable' = 'true',
     |  'streaming-source.monitor-interval' = '1 min',
     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
     |) */
     |WHERE event_type = 'shtOpenGoodsDetail'
     |AND ts_date >= '2020-07-15'
     |GROUP BY merchandise_id
     |ORDER BY pv DESC LIMIT 10
   """.stripMargin
)

result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()

三個(gè)table hint參數(shù)的含義解釋如下。

  • streaming-source.enable:設(shè)為true浩村,表示該Hive表可以作為Source做葵。
  • streaming-source.monitor-interval:感知Hive表新增數(shù)據(jù)的周期,以上設(shè)為1分鐘心墅。對于分區(qū)表而言酿矢,則是監(jiān)控新分區(qū)的生成,以增量讀取數(shù)據(jù)怎燥。
  • streaming-source.consume-start-offset:開始消費(fèi)的時(shí)間戳瘫筐,同樣需要寫成yyyy-MM-dd HH:mm:ss的形式。

更加具體的說明仍然可參見官方文檔(吐槽一句铐姚,這份文檔的Chinglish味道真的太重了=策肝。=

最后,由于SQL語句中有ORDER BY和LIMIT邏輯隐绵,所以需要調(diào)用toRetractStream()方法轉(zhuǎn)化為回撤流之众,即可輸出結(jié)果。

The End

Flink 1.11的Hive Streaming功能大大提高了Hive數(shù)倉的實(shí)時(shí)性依许,對ETL作業(yè)非常有利棺禾,同時(shí)還能夠滿足流式持續(xù)查詢的需求,具有一定的靈活性峭跳。

還有事情要做帘睦,民那晚安。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末坦康,一起剝皮案震驚了整個(gè)濱河市竣付,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌滞欠,老刑警劉巖古胆,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異筛璧,居然都是意外死亡逸绎,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進(jìn)店門夭谤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來棺牧,“玉大人,你說我怎么就攤上這事朗儒〖粘耍” “怎么了参淹?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長乏悄。 經(jīng)常有香客問我浙值,道長,這世上最難降的妖魔是什么檩小? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任开呐,我火速辦了婚禮,結(jié)果婚禮上规求,老公的妹妹穿的比我還像新娘筐付。我一直安慰自己,他們只是感情好阻肿,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布瓦戚。 她就那樣靜靜地躺著,像睡著了一般冕茅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蛹找,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天姨伤,我揣著相機(jī)與錄音,去河邊找鬼庸疾。 笑死乍楚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的届慈。 我是一名探鬼主播徒溪,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼金顿!你這毒婦竟也來了臊泌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤揍拆,失蹤者是張志新(化名)和其女友劉穎渠概,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嫂拴,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡播揪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了筒狠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猪狈。...
    茶點(diǎn)故事閱讀 38,716評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖辩恼,靈堂內(nèi)的尸體忽然破棺而出雇庙,到底是詐尸還是另有隱情谓形,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布状共,位于F島的核電站套耕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏峡继。R本人自食惡果不足惜冯袍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望碾牌。 院中可真熱鬧康愤,春花似錦、人聲如沸舶吗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽誓琼。三九已至检激,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間腹侣,已是汗流浹背叔收。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留傲隶,地道東北人饺律。 一個(gè)月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像跺株,于是被迫代替她去往敵國和親复濒。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評論 2 350