本文為學(xué)習(xí)筆記磕瓷,會(huì)隨著學(xué)習(xí)深入持續(xù)更新膘螟,僅供參考
1笙僚、什么是flink
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架和分布式處理引擎芳肌,可以實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)采集、實(shí)時(shí)計(jì)算和下游發(fā)送味咳。
2庇勃、實(shí)現(xiàn)的原理和基本模塊組成
flink實(shí)現(xiàn)一個(gè)簡(jiǎn)單程序的編程邏輯包括三部分:source、transform和sink槽驶。
通常我們處理大數(shù)據(jù)的場(chǎng)景是在集群下完成的责嚷,flink的集群模型是這樣工作的:
- 用戶在提交編寫(xiě)好的 flink 工程時(shí),會(huì)先創(chuàng)建一個(gè)client再進(jìn)行提交掂铐,client將這個(gè)job提交給集群的JobManager罕拂;
- JobManager將job調(diào)度到一個(gè)TaskManager
- TaskManager實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的 Worker,在其上執(zhí)行 flink job 的一組 task全陨,一個(gè)taskManager就是一個(gè)jvm進(jìn)程爆班,可以用獨(dú)立的線程來(lái)執(zhí)行task,而一組task就包含:soruce辱姨、transform和sink柿菩;
- 執(zhí)行完的task會(huì)將狀態(tài)/結(jié)果返回client.
詳細(xì)解釋參考鏈接
3、常用API
flink建立初的目標(biāo)是流批一體的處理引擎雨涛,它可以處理有界數(shù)據(jù)和無(wú)界數(shù)據(jù)枢舶。
有界數(shù)據(jù)集懦胞,就比如:文件、表凉泄、java集合躏尉;適用于DataSet api;
無(wú)界數(shù)據(jù)流后众,就比如:kafka持續(xù)存在的數(shù)據(jù)流胀糜;適用于DataStream api;
還有其他的如處理機(jī)器學(xué)習(xí)的Machine Learning api;
在上層處理flink提供了可供場(chǎng)景選擇的api,內(nèi)部的處理邏輯是一樣的。
常用的流式處理DataStream api的處理方法可以參考:Flink DataStream API 編程指南
Flink 提供了兩種關(guān)系型 API 來(lái)做流批統(tǒng)一處理:Table API 和 SQL合溺,無(wú)論是流數(shù)據(jù)還是歷史數(shù)據(jù)查库,使用Table API和SQL都能夠保證查詢的一致性和準(zhǔn)確性,更適合做數(shù)據(jù)的實(shí)時(shí)分析。
Table API 和 SQL直達(dá)鏈接
4、flink如何保持高效率,高可靠和狀態(tài)一致性痊臭?常見(jiàn)問(wèn)題有哪些
a)保持?jǐn)?shù)據(jù)一致性
Checkpoint:Flink通過(guò)定期在作業(yè)的數(shù)據(jù)流中插入checkpoint,將作業(yè)的狀態(tài)保存到持久化存儲(chǔ)登夫。這樣广匙,在發(fā)生故障時(shí),作業(yè)可以從故障之前的狀態(tài)恢復(fù)恼策,避免了數(shù)據(jù)重復(fù)處理或丟失鸦致。
Exactly-Once Sink:Flink中的Sink操作符保證僅將一條記錄寫(xiě)入外部系統(tǒng),即使在發(fā)生故障時(shí)也不會(huì)重復(fù)寫(xiě)入相同的記錄涣楷。這通過(guò)將Sink操作符的輸出與應(yīng)用的狀態(tài)一起保存在持久化存儲(chǔ)中實(shí)現(xiàn)分唾。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開(kāi)啟 checkpoint,每5000毫秒進(jìn)行一次,如果作業(yè)失敗,它會(huì)從最近的一次 checkpoint 恢復(fù)狮斗,保證數(shù)據(jù)一致性
env.enableCheckpointing(5000); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
b)反壓?jiǎn)栴}(消費(fèi)速度小于生產(chǎn)速度)
反壓的判斷:TaskManager會(huì)定期進(jìn)行反壓檢測(cè)绽乔,然后將結(jié)果返回給JobManager,JobManager在進(jìn)行整體比例計(jì)算碳褒,結(jié)果反應(yīng)在監(jiān)控指標(biāo)上
Flink metrics的官網(wǎng)文檔鏈接
問(wèn)題處理:
d) 數(shù)據(jù)傾斜(某個(gè)節(jié)點(diǎn)壓力極大折砸,導(dǎo)致任務(wù)失敗)
產(chǎn)生原因:業(yè)務(wù)有嚴(yán)重的熱點(diǎn)數(shù)據(jù)沙峻,錯(cuò)誤使用keyBy/GroupBy睦授,錯(cuò)誤的使用了分組的key,認(rèn)為生成熱點(diǎn)數(shù)據(jù)摔寨;flink消費(fèi)kafka數(shù)據(jù)時(shí)上下游并行度不一致(推薦:kafka的分區(qū)數(shù)和flink consumer的并行度一致)
解決:避免熱點(diǎn)key的設(shè)計(jì)去枷;自定義分區(qū)策略
e) 并行度設(shè)置
f) 如何做維表關(guān)聯(lián)查詢
場(chǎng)景:在處理流數(shù)據(jù)的時(shí)候需要用到數(shù)據(jù)庫(kù)表的數(shù)據(jù)(如:將訂單數(shù)據(jù)中省份id轉(zhuǎn)化成省份數(shù)據(jù)等)
解決:1、小數(shù)據(jù)量的維表可以進(jìn)行實(shí)時(shí)查詢,但是需要考慮用線程池删顶,妥善處理外部線程連接疗隶,及時(shí)關(guān)閉。
2翼闹、預(yù)加載維表全量數(shù)據(jù)。3蒋纬、LRU緩存(冷熱數(shù)據(jù))
5猎荠、在springboot中如何實(shí)現(xiàn)數(shù)據(jù)轉(zhuǎn)換集成
應(yīng)用場(chǎng)景:接收kafka數(shù)據(jù)入庫(kù)hdfs/doris,實(shí)時(shí)寫(xiě)入蜀备。
參考代碼邏輯如下
<!--核心依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>你的Flink版本</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>你的Flink版本</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>你的Flink版本</version>
</dependency>
<!--外部鏈接依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>你的Flink版本</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>你的Flink版本</version>
</dependency>
<!-- 如果需要寫(xiě)入Hadoop FileSystem关摇,添加以下依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem</artifactId>
<version>你的Flink版本</version>
</dependency>
<!-- 如果需要寫(xiě)入Hadoop HDFS,確保你的Hadoop版本兼容碾阁,并添加相應(yīng)依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>你的Hadoop版本</version>
</dependency>
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Properties;
public class FlinkKafkaTLVToHadoop {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置Kafka消費(fèi)者
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");
kafkaProps.setProperty("group.id", "my-group");
kafkaProps.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"my-input-topic",
new SimpleStringSchema(),
kafkaProps
);
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 解析TLV數(shù)據(jù)
DataStream<MyParsedData> parsedDataStream = kafkaStream
.process(new TLVParserFunction()); // 實(shí)現(xiàn)自定義的解析函數(shù)
// 寫(xiě)入Hadoop HDFS
parsedDataStream.addSink(new HadoopFsSink()); // 實(shí)現(xiàn)自定義的Hadoop Sink
env.execute("Flink Kafka TLV to Hadoop Job");
}
// 自定義的Hadoop Fs Sink
public static class HadoopFsSink implements SinkFunction<MyParsedData> {
private FileSystem fileSystem;
@Override
public void invoke(MyParsedData value) throws IOException {
if (fileSystem == null) {
fileSystem = FileSystem.get(new java.net.URI("hdfs://namenode:8020"), new Configuration());
}
fileSystem.write(new Path("/path/to/hdfs/file"), value.toString());
}
}
// 自定義的解析函數(shù)
public static class TLVParserFunction extends RichMapFunction<String, MyParsedData> {
@Override
public MyParsedData map(String tlv) throws Exception {
// 實(shí)現(xiàn)具體的解析邏輯
return new MyParsedData(/* 解析后的數(shù)據(jù) */);
}
}
// 自定義的數(shù)據(jù)類型输虱,表示解析后的數(shù)據(jù)
public static class MyParsedData {
// 字段和構(gòu)造函數(shù)
}
}
參考文件
1、Flink基本原理
2脂凶、官網(wǎng)文檔
3宪睹、Flink學(xué)習(xí)教程-文檔
4、Flink學(xué)習(xí)教程-視頻
5蚕钦、Flink常用API詳解