SQL的很多概念無法直接映射到流計算,這就是在流計算上定義SQL的難點“仓澹
為了在流計算上定義SQL,我們需要引入幾個概念艇炎。既然批處理需要定義SQL表的概念酌伊,那在流計算上也需要表的概念,我們需要將傳統(tǒng)靜態(tài)表擴展成動態(tài)表缀踪,所謂動態(tài)表就是數據會隨時間而不斷變化的表居砖。此時,我們發(fā)現流和動態(tài)表之間有一種對偶性驴娃,也就是說流和動態(tài)表可以相互轉換奏候。將流的每條數據插入到數據庫中,就得到了一張表;同時我們可以抽取動態(tài)表的changelog還原原始流唇敞。
從流計算到SQL蔗草,我們可以把它看成是連續(xù)查詢咒彤。連續(xù)查詢區(qū)別于傳統(tǒng)的批處理查詢,需要源源不斷地接收數據咒精,每收到一條新數據就會更新結果且結果也是一張動態(tài)表镶柱,那結果的動態(tài)表又可以作為下一個查詢的輸入,從而串起整個流計算模叙⌒穑 基于上述兩個概念,我們可以在SQL上定義流計算向楼。但是查吊,流計算中的數據需要不斷修正和更新,因此這些數據下發(fā)后可能導致最終結果的錯誤湖蜕,我們需要把這些錯誤數據進行修正逻卖,這就涉及到流計算中一個非常重要的概念——Retraction。
為了解釋此概念昭抒,我們舉一個簡單的例子评也,上圖所示有一個點擊輸入流,它具備兩個字段:user和url灭返,經過第一個查詢根據用戶進行分組盗迟,統(tǒng)計每個用戶的點擊次數;進入第二個查詢,根據點擊次數進行分組熙含,統(tǒng)計每個次數的具體點擊人數罚缕。最終,我們會收到兩條記錄怎静,點擊次數所對應的人數邮弹。從結果明顯可以看出計算有誤,Mary的數據并沒有合并計數蚓聘,這就需要引入修正的概念腌乡。
如上圖所示,經過修正之后夜牡,經過第二個查詢時与纽,Mary的總查詢次數會被合并計算,Mary 1的結果會被告知撤回塘装,從而輸出正確的結果急迂,這就是引入Retraction的作用。在整個過程中蹦肴,是否觸發(fā)Retraction以及發(fā)送方式均由優(yōu)化器決定袋毙,用戶對整個過程是無感知的∪哂龋 在此基礎上听盖,我們發(fā)現世界不需要所謂的Stream SQL語法,標準的ANSI SQL就可用來定義流計算裂七,Flink SQL就是標準的ANSI SQL語法皆看。其部分核心功能如下:DDL用來定義數據源表、數據結構表;UDF背零、UDTF腰吟、UDAF用戶自定義函數,可以定制化用戶復雜的業(yè)務需求;JOIN是一個比較復雜的功能徙瓶,包括流與流之間的Join毛雇,流與表之間的Join以及Windows Join等;聚合功能包括類似Group AGG,Windoes Agg以及Over Agg等侦镇。
接下來我會結合實例對核心功能進行介紹灵疮。首先是裝載數據,需要create table語法壳繁。如上圖所示震捣,我們先定義一張clicks表,然后定義表的schema闹炉、user蒿赢、cTime以及url,with里是表的一系列屬性渣触,它是一個來自kafka的日志表羡棵,我們可以用SELECT * FROM clicks查詢轉載表里面的數據。
如果要將上述查詢數據寫到某個表中嗅钻,我們需要用create table定義結果表皂冰,語法同上,創(chuàng)建一張 last_clicks 結果表啊犬,主鍵是user灼擂,通過INSERT INTO 語法將上述查詢數據插入Mysql表中。
如果想把中間處理結果同時寫入多個存儲觉至,比如把數據處理結果同時寫到Mysql和HBase剔应,如上使用CREATE VIEW 定義一個來自淘寶的點擊記錄,同時連續(xù)寫多個INSERT INTO到Mysql和HBase语御。
接下來是Group Aggregate峻贮,也就是無限流量聚合。所謂無限流量聚合指從歷史開始到現在的所有用戶點擊數據应闯,如上查詢展示的是根據用戶分組纤控,然后統(tǒng)計點擊次數。如果來了一條Mary1的數據碉纺,我們就先插入該數據船万,后續(xù)如果Mary再次進行點擊刻撒,我們就在原數據基礎上進行修改更新,以此類推耿导。
Window Aggregate是定義在窗口上的聚合声怔,有別于上述無限流聚合,它的原理是是每個窗口對應輸出一個結果舱呻,比如每小時每個用戶的點擊次數醋火,需要在group by的結果上加上endT數據,也就是窗口標識箱吕。
接下來介紹雙流join芥驳,目前我們支持INNER, LEFT, RIGHT, FULL, SEMI, ANTI等Join類型,舉例說明雙流Join的主要使用場景茬高,比如把主流打成寬表兆旬,并補上額外字段等。如上圖所示雅采,我們需要將訂單和物流表信息進行Join操作爵憎,在Join的物理實現上會有兩份狀態(tài),用來存儲兩條流到目前為止收到的所有歷史數據,淘汰機制時間設定為一天半一次婚瓜。兩者中任何一方信息延遲都會先在表中等待宝鼓,直到同一個訂單的信息與物流關聯(lián)之后才會通過Join輸出。
維表Join與雙流Join類似巴刻,目前支持INNER, LEFT兩種交易類型愚铡。維表Join的使用同樣為補全主流,但想補全的字段在另一維表中胡陪。如上圖所示沥寥,使用時首先需要通過CREATE TABLE 語法定義一張維表,此處定義的是 Products 表柠座,存儲與產品相關信息邑雅,查詢同樣使用Join語法。Order與Products表通過Products ID實現Join妈经。關鍵字PERIOD FOR SYSTEM_TIME 是 SQL 2.11標準里的語法淮野,意思是當前關聯(lián)的Products是當前時刻的信息,關聯(lián)之后不再更新信息吹泡。上圖右側展示的是維表Join物理執(zhí)行的概念骤星。我們可以根據Order去Products數據庫里查詢信息,最終Products維表返回關聯(lián)信息爆哑《茨眩 核心功能如上所述,接下來主要聊優(yōu)化揭朝。維表中队贱,訂單O1查詢時是堵塞等待IO的狀態(tài)色冀,此時無論如何調優(yōu)性能,吞吐量和CPU使用率都上不去柱嫌,因此我們引入異步IO功能呐伞。
如上左半部分為未引入異步IO時的狀態(tài),如上右半部分為引入后慎式,此時若發(fā)起A請求,不需等待IO就可立刻發(fā)起B(yǎng)CD查詢請求趟径,然后異步等待返回結果瘪吏。返回ABCD以后再管理輸出,極大地提高了整體性能蜗巧。
如上掌眠,異步IO使用時與維表Join只有一行配置改動,對于用戶來說幕屹,這個使用是非常簡便的蓝丙。
第二個優(yōu)化是大數據中的常見場景——數據傾斜。如上為改進之前望拖,紅色聚合節(jié)點出現數據積壓現象渺尘,而紫色節(jié)點相對較空。
如果持續(xù)一段時間说敏,紅色聚合節(jié)點就會被打滿鸥跟,從而變?yōu)闊狳c,所有上游map節(jié)點就會反壓盔沫,停止處理數據進入等待狀態(tài)医咨,而下游的紫色節(jié)點基本處于空閑狀態(tài)。
我們引入Local-Global 聚合優(yōu)化架诞。左圖是未優(yōu)化拓撲圖拟淮,右邊是引入Local-Global優(yōu)化后的圖,我們在Map后引入Local Agg節(jié)點谴忧,Map與Local Agg是鏈在一起的一個線程很泊,之間的數據傳輸沒有任何網絡開銷。Local Agg可以將收到的數據按照 key進行預聚合俏蛮,然后將結果按照 key分發(fā)給下游Global Agg進行匯總撑蚌。 假如每個Map的 TPS 是每秒1萬的數據量搏屑,全局就2個 key:紅色和紫色争涌。如果 Local Agg聚合的間隔是每秒鐘一次,那么每個Local Agg能將1萬條數據預聚合成最多2條(全局共2個 key)辣恋。那么Global Agg每秒鐘最多收到只會三條消息亮垫,能有效降低Global Agg 的熱點模软。優(yōu)化后,我們對此進行性能測試饮潦,發(fā)現Local-Global 可以帶來超過20倍的性能提升燃异。因此,整個方案是十分有效的继蜡。