前言
與DataStream同樣肋杖,官方在Flink SQL上也提供了很多連接器,今天來學(xué)習(xí)總結(jié)一下JDBC連接器
環(huán)境準(zhǔn)備
如果使用編碼挖函,需要引入兩個依賴包状植,F(xiàn)link提供的jdbc連接器依賴和和對應(yīng)的mysql驅(qū)動包,
以下為1.12.0 提供的jdbc連接器依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.12.0</version>
</dependency>
使用學(xué)習(xí)
The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.
來自官網(wǎng)的一段介紹怨喘,簡單翻譯一下就是說津畸,如果DDL上定義了主鍵,則JDBC接收器以upsert模式操作與外部系統(tǒng)交互必怜,否則以append模型來進行交互肉拓。
本人測試了一下,詳細步驟就不寫了梳庆,和官網(wǎng)差不多暖途,測試主要步驟如下:
外部數(shù)據(jù)庫DDL設(shè)置主鍵,F(xiàn)link SQL DDL設(shè)置主鍵
外部數(shù)據(jù)庫DDL不設(shè)置主鍵膏执,F(xiàn)link SQL DDL設(shè)置主鍵
外部數(shù)據(jù)庫DDL設(shè)置主鍵驻售,F(xiàn)link SQL DDL不設(shè)置主鍵
外部數(shù)據(jù)庫DDL不設(shè)置主鍵,F(xiàn)link SQL DDL不設(shè)置主鍵
結(jié)果如下:
外部系統(tǒng)DDL有主鍵 | 外部系統(tǒng)DDL無主鍵 | |
---|---|---|
Flink SQL DDL 有主鍵 | upsert | append |
Flink SQL DDL 無主鍵 | upsert | append |
總結(jié):JDBC sink的操作時更米,如果外部系統(tǒng)定義的DDL存在主鍵欺栗,則JDBC連接器將使用upsert語義而不是簡單的insert,在Flink任務(wù)執(zhí)行中如果出現(xiàn)了故障,F(xiàn)link作業(yè)將會從上一個成功的檢查點恢復(fù)并重新處理迟几,這可能導(dǎo)致在恢復(fù)期間重新處理消息消请。
強烈建議使用upsert模式,因為使用append模式需要重新處理記錄类腮,下游可能會出現(xiàn)重復(fù)數(shù)據(jù)臊泰。
重要特性
Partitioned Scan
為了加速并行Source任務(wù)實例中的數(shù)據(jù)讀取,F(xiàn)link 在JDBC連接器中提供了scan.partition.column蚜枢,scan.partition.num因宇,scan.partition.lower-bound,scan.partition.upper-bound 這4個配置屬性祟偷。其原理簡單解釋一下:
如果一個線程去讀一張很大的表察滑,從頭讀到尾,肯定很慢修肠,如果想加快速度贺辰,自然能想到使用多線程。如何使用呢嵌施?
比如假設(shè)有id為0到100的數(shù)據(jù)饲化,id不連續(xù),并且實際上有50條記錄吗伤,如果我們希望分而治之吃靠,那么最好是五個線程,每個線程讀取10條數(shù)據(jù)足淆。但是肯定是做不到這么精確的巢块。最簡單的辦法是,產(chǎn)生五條如下的SQL:
select * from xxx where id<20;
select * from xxx where id>=20 and id <40;
select * from xxx where id>=40 and id <60;
select * from xxx where id>=60 and id <80;
select * from xxx where id>=80;
因為id中間有空隙巧号,所以每條SQL實際拿到的數(shù)據(jù)并不一樣族奢。但沒關(guān)系,通過五個線程執(zhí)行這五條SQL丹鸿,我們肯定可以通過更少的時間獲取到全量數(shù)據(jù)越走。
所以在分區(qū)掃描中,確定了4個屬性規(guī)則靠欢,用來并行的讀這些數(shù)據(jù)
scan.partition.column: 按照哪個列進行分區(qū)
scan.partition.num:分區(qū)數(shù)量
scan.partition.lower-bound:分區(qū)字段的最小值
scan.partition.upper-bound:分區(qū)字段的最大值
這個其實和之前博客中寫的過優(yōu)化SQL方法原理是一樣的廊敌,鏈接請看:
SQL優(yōu)化之使用數(shù)學(xué)的方式動態(tài)的確定區(qū)間并統(tǒng)計02
簡單來說就是,如果這個比如這樣(...,20),[20,40),[40,60),[60,80)....[80,...) 门怪。少還可以骡澈,如果面對上百組的情況,后續(xù)不容易維護薪缆,比如上面的例子秧廉,第一組的最小值是20,最后一組的最大值是80,然后總共5組,這樣Flink就知道20-80之間還要再分三組拣帽。
目前Flink 支持 數(shù)字疼电,日期,時間戳等類型的分區(qū)掃描配置减拭。
Lookup Cache
JDBC連接器在作為Source維度表使用時蔽豺,可以開啟緩存來提高臨時連接JDBC連接器的性能。
默認情況下拧粪,不啟用查找緩存修陡,因此所有請求都發(fā)送到外部數(shù)據(jù)庫。啟用查找緩存后可霎,每個進程(即TaskManager)將保存一個緩存魄鸦。Flink將首先查找緩存,并且僅在缺少緩存時才將請求發(fā)送到外部數(shù)據(jù)庫癣朗,并使用返回的行更新緩存拾因。當(dāng)緩存達到最大緩存行數(shù)lookup.cache.max-rows
或超過最大生存時間時,緩存中最舊的行將過期lookup.cache.ttl
旷余。緩存的行可能不是最新的绢记,用戶可以調(diào)整lookup.cache.ttl
為較小的值以獲取更好的新鮮數(shù)據(jù),但這可能會增加發(fā)送到數(shù)據(jù)庫的請求的數(shù)量正卧。因此蠢熄,這是吞吐量和正確性之間的平衡。
lookup.max-retries
則配置在查詢失敗后重試的次數(shù)
Buffer-flush
當(dāng)mysql被使用被Sink時炉旷,可以配置
sink.buffer-flush.max-rows
:配置刷新前緩沖記錄的最大大小
sink.buffer-flush.interval
:配置刷新間隔签孔,單位為毫秒,在此期間窘行,異步線程將刷新數(shù)據(jù)骏啰。可以設(shè)置為'0'來禁用它抽高。注意,”sink.buffer-flush.max-rows'可以設(shè)置為'0'判耕,并設(shè)置刷新間隔,以允許完成異步處理緩沖的動作翘骂。
總結(jié)
本文是本文學(xué)習(xí)和使用Flink SQL JDBC 連接器的學(xué)習(xí)筆記和總結(jié)壁熄,如果出現(xiàn)描述問題,歡迎大家留言指出碳竟,一起努力草丧。
-- by 兩只猴