Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing.
Flink’s SQL support is based on Apache Calcite which implements the SQL standard.
注意事項:table api和sql還處于活躍開發(fā)狀態(tài)烦衣,不是所有功能都已經實現(xiàn)
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Concepts & Common API
Streaming Table API & SQL
關系查詢在Data Streams
區(qū)別
<colgroup><col style="width: 424px;"><col style="width: 424px;"></colgroup>
|
Relational Algebra / SQL
|
Stream Processing
|
|
Relations (or tables) are bounded (multi-)sets of tuples.
|
A stream is an infinite sequences of tuples.
|
|
A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.
|
A streaming query cannot access all data when is started and has to "wait" for data to be streamed in.
|
|
A batch query terminates after it produced a fixed sized result.
|
A streaming query continuously updates its result based on the received records and never completes.
|
聯(lián)系
- A database table is the result of a stream of INSERT, UPDATE, and DELETE DML statements, often called changelog stream.
- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view’s base relations.
- The materialized view is the result of the streaming SQL query.
Dynamic Tables & Continuous Queries
Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic table are changing over time.
[圖片上傳失敗...(image-da00e1-1524133656710)]
- A stream is converted into a dynamic table.
- A continuous query is evaluated on the dynamic table yielding a new dynamic table.
- The resulting dynamic table is converted back into a stream.
Defining a Table on a Stream
Essentially, we are building a table from an INSERT-only changelog stream.
The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted.
[圖片上傳失敗...(image-b30af4-1524133656710)]
注意: A table which is defined on a stream is internally not materialized
Continuous Queries
A continuous query is evaluated on a dynamic table and produces a new dynamic table as result. In contrast to a batch query, a continuous query never terminates and updates its result table according to the updates on its input tables.At any point in time, the result of a continuous query is semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.本質上與在batch table中使用sql查詢,效果是一致的
[圖片上傳失敗...(image-1345f7-1524133656710)]
When the query is started, the clicks table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the clicks table. After the first row [Mary, ./home] was inserted, the result table (right-hand side, top) consists of a single row [Mary, 1]. When the second row [Bob, ./cart] is inserted into the clicks table, the query updates the result table and inserts a new row [Bob, 1]. The third row [Mary, ./prod?id=1] yields an update of an already computed result row such that [Mary, 1] is updated to [Mary, 2]. Finally, the query inserts a third row [Liz, 1] into the result table, when the fourth row is appended to the clicks table.
[圖片上傳失敗...(image-6a8473-1524133656710)]
As before, the input table clicks is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (cTime) between 12:00:00 and 12:59:59. The query computes two results rows from this input (one for each user) and appends them to the result table. For the next window between 13:00:00 and 13:59:59, the clicks table contains three rows, which results in another two rows being appended to the result table. The result table is updated, as more rows are appended to clicks over time.
Query Restrictions
State Size: 有些程序會運行經年累月的,中間的state都會做保存,如果數據量大了會造成查詢失敗潮尝。
Computing Updates:
Table to Stream Conversion
A dynamic table can be continuously modified by INSERT, UPDATE, and DELETE changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without UPDATE and DELETE modifications, or anything in between.
- Append-only stream: A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.
- Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message, a DELETE change as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.
- Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete message. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a dynamic table by encoding INSERT and UPDATE changes as upsert message and DELETE changes as delete message. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
Time Attributes
Flink主要提供三種時間粒度:
1.Processing time:使用機器的的時間斜纪,通常被稱為:wall-clock time
2.Event time :事件時間,通常都是基于每行數據本身的timestamp
3.Ingestion time:通常是事件進入到flink的洞就,通常與事件時間相同棍厌。
Processing Time
During DataStream-to-Table Conversion
Using a TableSource
Event time
During DataStream-to-Table Conversion
There are two ways of defining the time attribute when converting a DataStream into a Table:
- Extending the physical schema by an additional logical field
- Replacing a physical field by a logical field (e.g. because it is no longer needed after timestamp extraction).
Using a TableSource
The event time attribute is defined by a TableSource that implements the DefinedRowtimeAttribute interface. The logical time attribute is appended to the physical schema defined by the return type of the TableSource.
Timestamps and watermarks must be assigned in the stream that is returned by the getDataStream() method.
Query Configuration
Table API
Update and Append Queries
第一個栗子肾胯,是可以更新前面的數據,changelog stream定義了結果包含insert 和 update changes耘纱。第二個栗子敬肚,只是追加結果到結果表中。changelog stream 結果表只是由insert changes組成
SQL
SQL queries are specified with the sql() method of the TableEnvironment. The method returns the result of the SQL query as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program.
Table可以從 TableSource, Table, DataStream, 或者 DataSet 轉化而來束析。
或者艳馒,用戶可以從 注冊外部的目錄到 TableEnvironment中
為了用sql查詢的方式查詢table,必須注冊到 tableEnvironment中员寇。一個table可以從 TableSource弄慰,Table,DataStream 蝶锋, 或者 DataSet中注冊生成陆爽。或者扳缕,用戶也可以從外部目錄注冊到TableEnvironment中慌闭,通過指定一個特定的目錄。
In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, DataStream, or DataSet. Alternatively, users can also register external catalogs in a TableEnvironment to specify the location of the data sources.
注意:flink sql的功能支持還沒有健全躯舔。有些不支持的sql查詢了之后會報 TableException驴剔。所以支持的sql在批量處理中或者流式處理中都列舉在下面
Supported Syntax
Flink parses SQL using Apache Calcite, which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
Scan, Projection, and Filter
Aggregations
GroupBy Aggregation
GroupBy Window Aggregation
Over Window aggregation
Group Windows
<colgroup><col style="width: 310px;"><col style="width: 310px;"></colgroup>
|
TUMBLE(time_attr, interval)
|
固定窗口
|
|
HOP(time_attr, interval, interval)
|
滑動窗口(跳躍窗口)。有兩個interval參數庸毫,第一個主要定義了滑動間隔,第二個主要定義了窗口大小
|
|
SESSION(time_attr, interval)
|
會話窗口衫樊。session time window沒有固定的持續(xù)時間飒赃,但是它們的邊界是通過時間 interval來交互的利花。如果在固定的間隙之間沒有新的event進入,session window就會關閉载佳,或者這行數據就會添加到已有的window中
|
Time Attributes
- Processing time炒事。記錄是機器和系統(tǒng)的時間,當在做處理的時候蔫慧。
- Event time挠乳。消息自帶的時間,可以通過encode等指定姑躲。
- Ingestion time睡扬。事件到達flink的時間。這個與process time的功能類似黍析。
<colgroup><col style="width: 297px;"><col style="width: 293px;"></colgroup>
|
Auxiliary Function
|
Description
|
|
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
|
常用于計算窗口的開始時間和結束時間卖怜。Returns the start timestamp of the corresponding tumbling, hopping, and session window.
|
|
TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)
|
Returns the end timestamp of the corresponding tumbling, hopping, and session window.
|
Table Sources & Sinks
User-Defined Functions