溫故知新:初識(shí)flink

本文為學(xué)習(xí)筆記磕瓷,會(huì)隨著學(xué)習(xí)深入持續(xù)更新膘螟,僅供參考

1笙僚、什么是flink
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架和分布式處理引擎芳肌,可以實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)采集、實(shí)時(shí)計(jì)算和下游發(fā)送味咳。


2庇勃、實(shí)現(xiàn)的原理和基本模塊組成
flink實(shí)現(xiàn)一個(gè)簡(jiǎn)單程序的編程邏輯包括三部分:source、transform和sink槽驶。

編程模型.png

通常我們處理大數(shù)據(jù)的場(chǎng)景是在集群下完成的责嚷,flink的集群模型是這樣工作的:

  1. 用戶在提交編寫(xiě)好的 flink 工程時(shí),會(huì)先創(chuàng)建一個(gè)client再進(jìn)行提交掂铐,client將這個(gè)job提交給集群的JobManager罕拂;
  2. JobManager將job調(diào)度到一個(gè)TaskManager
  3. TaskManager實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的 Worker,在其上執(zhí)行 flink job 的一組 task全陨,一個(gè)taskManager就是一個(gè)jvm進(jìn)程爆班,可以用獨(dú)立的線程來(lái)執(zhí)行task,而一組task就包含:soruce辱姨、transform和sink柿菩;
  4. 執(zhí)行完的task會(huì)將狀態(tài)/結(jié)果返回client.
    詳細(xì)解釋參考鏈接
    集群的模型.png

3、常用API
flink建立初的目標(biāo)是流批一體的處理引擎雨涛,它可以處理有界數(shù)據(jù)和無(wú)界數(shù)據(jù)枢舶。
有界數(shù)據(jù)集懦胞,就比如:文件、表凉泄、java集合躏尉;適用于DataSet api;
無(wú)界數(shù)據(jù)流后众,就比如:kafka持續(xù)存在的數(shù)據(jù)流胀糜;適用于DataStream api;
還有其他的如處理機(jī)器學(xué)習(xí)的Machine Learning api;
在上層處理flink提供了可供場(chǎng)景選擇的api,內(nèi)部的處理邏輯是一樣的。

架構(gòu).png

常用的流式處理DataStream api的處理方法可以參考:Flink DataStream API 編程指南
Flink 提供了兩種關(guān)系型 API 來(lái)做流批統(tǒng)一處理:Table API 和 SQL合溺,無(wú)論是流數(shù)據(jù)還是歷史數(shù)據(jù)查库,使用Table API和SQL都能夠保證查詢的一致性和準(zhǔn)確性,更適合做數(shù)據(jù)的實(shí)時(shí)分析。
Table API 和 SQL直達(dá)鏈接


4、flink如何保持高效率,高可靠和狀態(tài)一致性痊臭?常見(jiàn)問(wèn)題有哪些
a)保持?jǐn)?shù)據(jù)一致性
Checkpoint:Flink通過(guò)定期在作業(yè)的數(shù)據(jù)流中插入checkpoint,將作業(yè)的狀態(tài)保存到持久化存儲(chǔ)登夫。這樣广匙,在發(fā)生故障時(shí),作業(yè)可以從故障之前的狀態(tài)恢復(fù)恼策,避免了數(shù)據(jù)重復(fù)處理或丟失鸦致。
Exactly-Once Sink:Flink中的Sink操作符保證僅將一條記錄寫(xiě)入外部系統(tǒng),即使在發(fā)生故障時(shí)也不會(huì)重復(fù)寫(xiě)入相同的記錄涣楷。這通過(guò)將Sink操作符的輸出與應(yīng)用的狀態(tài)一起保存在持久化存儲(chǔ)中實(shí)現(xiàn)分唾。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 開(kāi)啟 checkpoint,每5000毫秒進(jìn)行一次,如果作業(yè)失敗,它會(huì)從最近的一次 checkpoint 恢復(fù)狮斗,保證數(shù)據(jù)一致性
        env.enableCheckpointing(5000); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

image.png

b)反壓?jiǎn)栴}(消費(fèi)速度小于生產(chǎn)速度)
反壓的判斷:TaskManager會(huì)定期進(jìn)行反壓檢測(cè)绽乔,然后將結(jié)果返回給JobManager,JobManager在進(jìn)行整體比例計(jì)算碳褒,結(jié)果反應(yīng)在監(jiān)控指標(biāo)上
Flink metrics的官網(wǎng)文檔鏈接
image.png

問(wèn)題處理:
分析方式一.png

分析方式二.png

分析方式三.png

d) 數(shù)據(jù)傾斜(某個(gè)節(jié)點(diǎn)壓力極大折砸,導(dǎo)致任務(wù)失敗)
產(chǎn)生原因:業(yè)務(wù)有嚴(yán)重的熱點(diǎn)數(shù)據(jù)沙峻,錯(cuò)誤使用keyBy/GroupBy睦授,錯(cuò)誤的使用了分組的key,認(rèn)為生成熱點(diǎn)數(shù)據(jù)摔寨;flink消費(fèi)kafka數(shù)據(jù)時(shí)上下游并行度不一致(推薦:kafka的分區(qū)數(shù)和flink consumer的并行度一致)
解決:避免熱點(diǎn)key的設(shè)計(jì)去枷;自定義分區(qū)策略
避免熱點(diǎn)key的設(shè)計(jì)解決思路.png

分區(qū)策略.png

e) 并行度設(shè)置
image.png

f) 如何做維表關(guān)聯(lián)查詢
場(chǎng)景:在處理流數(shù)據(jù)的時(shí)候需要用到數(shù)據(jù)庫(kù)表的數(shù)據(jù)(如:將訂單數(shù)據(jù)中省份id轉(zhuǎn)化成省份數(shù)據(jù)等)
解決:1、小數(shù)據(jù)量的維表可以進(jìn)行實(shí)時(shí)查詢,但是需要考慮用線程池删顶,妥善處理外部線程連接疗隶,及時(shí)關(guān)閉。
2翼闹、預(yù)加載維表全量數(shù)據(jù)。3蒋纬、LRU緩存(冷熱數(shù)據(jù))


5猎荠、在springboot中如何實(shí)現(xiàn)數(shù)據(jù)轉(zhuǎn)換集成
應(yīng)用場(chǎng)景:接收kafka數(shù)據(jù)入庫(kù)hdfs/doris,實(shí)時(shí)寫(xiě)入蜀备。
參考代碼邏輯如下

 <!--核心依賴-->
   <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>你的Flink版本</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>你的Flink版本</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>你的Flink版本</version>
    </dependency>
 <!--外部鏈接依賴-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>你的Flink版本</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>你的Flink版本</version>
    </dependency>
    <!-- 如果需要寫(xiě)入Hadoop FileSystem关摇,添加以下依賴 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem</artifactId>
        <version>你的Flink版本</version>
    </dependency>
    <!-- 如果需要寫(xiě)入Hadoop HDFS,確保你的Hadoop版本兼容碾阁,并添加相應(yīng)依賴 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>你的Hadoop版本</version>
    </dependency>
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import java.io.IOException;
import java.util.Properties;
 
public class FlinkKafkaTLVToHadoop {
 
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // 配置Kafka消費(fèi)者
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");
        kafkaProps.setProperty("group.id", "my-group");
        kafkaProps.setProperty("auto.offset.reset", "earliest");
 
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
            "my-input-topic",
            new SimpleStringSchema(),
            kafkaProps
        );
 
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
 
        // 解析TLV數(shù)據(jù)
        DataStream<MyParsedData> parsedDataStream = kafkaStream
            .process(new TLVParserFunction()); // 實(shí)現(xiàn)自定義的解析函數(shù)
 
        // 寫(xiě)入Hadoop HDFS
        parsedDataStream.addSink(new HadoopFsSink()); // 實(shí)現(xiàn)自定義的Hadoop Sink
 
        env.execute("Flink Kafka TLV to Hadoop Job");
    }
 
    // 自定義的Hadoop Fs Sink
    public static class HadoopFsSink implements SinkFunction<MyParsedData> {
        private FileSystem fileSystem;
 
        @Override
        public void invoke(MyParsedData value) throws IOException {
            if (fileSystem == null) {
                fileSystem = FileSystem.get(new java.net.URI("hdfs://namenode:8020"), new Configuration());
            }
            fileSystem.write(new Path("/path/to/hdfs/file"), value.toString());
        }
    }
 
    // 自定義的解析函數(shù)
    public static class TLVParserFunction extends RichMapFunction<String, MyParsedData> {
        @Override
        public MyParsedData map(String tlv) throws Exception {
            // 實(shí)現(xiàn)具體的解析邏輯
            return new MyParsedData(/* 解析后的數(shù)據(jù) */);
        }
    }
 
    // 自定義的數(shù)據(jù)類型输虱,表示解析后的數(shù)據(jù)
    public static class MyParsedData {
        // 字段和構(gòu)造函數(shù)
    }
}

參考文件
1、Flink基本原理
2脂凶、官網(wǎng)文檔
3宪睹、Flink學(xué)習(xí)教程-文檔
4、Flink學(xué)習(xí)教程-視頻
5蚕钦、Flink常用API詳解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末亭病,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嘶居,更是在濱河造成了極大的恐慌罪帖,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邮屁,死亡現(xiàn)場(chǎng)離奇詭異整袁,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)佑吝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門坐昙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人迹蛤,你說(shuō)我怎么就攤上這事民珍。” “怎么了盗飒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵嚷量,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我逆趣,道長(zhǎng)蝶溶,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮抖所,結(jié)果婚禮上梨州,老公的妹妹穿的比我還像新娘。我一直安慰自己田轧,他們只是感情好暴匠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著傻粘,像睡著了一般每窖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上弦悉,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天窒典,我揣著相機(jī)與錄音,去河邊找鬼稽莉。 笑死瀑志,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的污秆。 我是一名探鬼主播劈猪,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼良拼!你這毒婦竟也來(lái)了岸霹?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤将饺,失蹤者是張志新(化名)和其女友劉穎贡避,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體予弧,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡刮吧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了掖蛤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杀捻。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蚓庭,靈堂內(nèi)的尸體忽然破棺而出致讥,到底是詐尸還是另有隱情,我是刑警寧澤器赞,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布垢袱,位于F島的核電站,受9級(jí)特大地震影響港柜,放射性物質(zhì)發(fā)生泄漏请契。R本人自食惡果不足惜咳榜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望爽锥。 院中可真熱鬧涌韩,春花似錦、人聲如沸氯夷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)腮考。三九已至擎淤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間秸仙,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工桩盲, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留寂纪,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓赌结,卻偏偏與公主長(zhǎng)得像捞蛋,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子柬姚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容