Flink 使用介紹相關(guān)文檔目錄
Flink內(nèi)置數(shù)據(jù)源
Text file
讀取磁盤或者HDFS中的文件作為數(shù)據(jù)源彪薛。
唯一的參數(shù)file path可以指定:
- file:///path/to/file.txt
- hdfs:///path/to/file.txt
注意:
- 如果不填寫前綴
file://
或者hdfs://
,默認(rèn)為file://
- 使用Flink讀取HDFS文件系統(tǒng)傀缩,需要去官網(wǎng)下載對應(yīng)Pre-bundled Hadoop包案铺。這里給出的鏈接是適用于Hadoop 2.8.3折晦。之后將這個jar復(fù)制到flink安裝位置的lib目錄中货抄。
val stream = env.readTextFile("/path/to/file.txt")
socketTextStream
使用socket作為數(shù)據(jù)源。但不推薦socket在生產(chǎn)環(huán)境中作為數(shù)據(jù)源蒲跨。原因如下:
- socket無狀態(tài)译断,也不能replay。故無法保證數(shù)據(jù)精準(zhǔn)投送或悲。
- socket數(shù)據(jù)源并行度只能是1孙咪,無法很好利用并發(fā)處理性能。
SocketTextStream適合用于debug或者是測試用途巡语。
val stream = env.socketTextStream("localhost", 9000)
fromElements
將一系列元素作為數(shù)據(jù)源翎蹈。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3);
fromCollection
和fromElements方法類似,不同的是該方法接收一個集合對象男公,而不是可變參數(shù)荤堪。如下所示:
val stream = env.fromCollection(Array(1, 2, 3))
Kafka 數(shù)據(jù)源
該數(shù)據(jù)源用于接收Kafka的數(shù)據(jù)。
使用Kafka數(shù)據(jù)源之前需要先確定Kafka的版本枢赔,引入對應(yīng)的Kafka Connector以來澄阳。對應(yīng)關(guān)系如下所示。
Kafka 版本 | Maven 依賴 |
---|---|
0.8.x | flink-connector-kafka-0.8_2.11 |
0.9.x | flink-connector-kafka-0.9_2.11 |
0.10.x | flink-connector-kafka-0.10_2.11 |
0.11.x | flink-connector-kafka-0.11_2.11 |
1.0 以上 | flink-connector-kafka_2.11 |
引入Maven依賴踏拜。以flink-connector-kafka_2.11
為例碎赢,添加以下依賴到pom.xml
文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
在集群中運行時,為了減少提交jar包的大小速梗,需要將該依賴設(shè)置為provided肮塞。然后把此依賴包復(fù)制到Flink各個節(jié)點安裝位置的lib目錄中襟齿。
一個簡單的使用例子如下:
// 設(shè)置Kafka屬性
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.100.128:9092")
properties.setProperty("group.id", "test")
// 創(chuàng)建Kafka數(shù)據(jù)源,其中test為topic名稱
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
DeserializationSchema
DeserializationSchema用于將接收到的二進制數(shù)據(jù)轉(zhuǎn)換為Java或Scala對象枕赵。Kafka Connector提供了如下4種DeserializationSchema:
- TypeInformationSerializationSchema:使用Flink的TypeInformation反序列化蕊唐。如果上游數(shù)據(jù)也是通過Flink TypeInformation序列化后寫入的,這里使用此schema最為合適烁设。
- JsonDeserializationSchema :將獲取的數(shù)據(jù)轉(zhuǎn)換為JSON格式替梨。這里有一個坑,如果發(fā)送過來的數(shù)據(jù)不是合法的JSON格式装黑,數(shù)據(jù)源會拋出異常導(dǎo)致TaskManager重啟副瀑。如果需要對不合法的JSON數(shù)據(jù)容錯,需要實現(xiàn)自定義的DeserializationSchema恋谭。
- AvroDeserializationSchema:讀取Avro格式的數(shù)據(jù)糠睡。
- SimpleStringSchema:轉(zhuǎn)換接收到的數(shù)據(jù)為字符串。
自定義DeserializationSchema
所有的Schema需要實現(xiàn)DeserializationSchema疚颊。該接口源碼如下所示:
@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Deserializes the byte message.
*
* @param message The message, as a byte array.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(byte[] message) throws IOException;
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
}
方法解釋:
- deserialize:將二進制消息轉(zhuǎn)換為某類型消息狈孔。
- isEndOfStream:表示是否是最后一條數(shù)據(jù)。
以SimpleStringSchema為例展示下怎么編寫自定義的DeserializationSchema材义。
相關(guān)代碼如下:
@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
// SerializationSchema接口的方法省略
@Override
public String deserialize(byte[] message) {
return new String(message, charset);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
// ...
}
起始位置屬性配置
使用示例:
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
方法解釋:
- setStartFromEarliest:從最早兒元素開始消費
- setStartFromLatest:從最近的元素開始消費
- setStartFromTimestamp:從指定時間戳的數(shù)據(jù)開始消費
- setStartFromGroupOffsets:這是默認(rèn)的配置均抽。從消費組的offset開始消費。必須配置group.id配置項其掂。
Topic和分區(qū)感知
Topic感知
可以使用如下構(gòu)造函數(shù)創(chuàng)建FlinkKafkaConsumer:
FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
和指定topic名稱不同的是油挥,這里傳入的是一個正則表達式。所有名稱匹配該正則表達式的topic都會被訂閱款熬。如果配置了分區(qū)感知(配置flink.partition-discovery.interval-millis
為非負(fù)數(shù))深寥,Job啟動之后kafka新創(chuàng)建的topic如果匹配該正則,也會被訂閱到贤牛。
分區(qū)感知
在Job運行過程中如果kafka新創(chuàng)建了partition惋鹅,F(xiàn)link可以動態(tài)感知到,然后對其中數(shù)據(jù)進行消費殉簸。整個過程仍然可以保證exactly once語義闰集。
默認(rèn)情況分區(qū)感知是禁用的。如果要開啟分區(qū)感知喂链,可以設(shè)置flink.partition-discovery.interval-millis
返十,即分區(qū)感知觸發(fā)時間間隔。
實現(xiàn)自定義數(shù)據(jù)源
自定義數(shù)據(jù)源需要實現(xiàn)Flink提供的SourceFunction接口椭微。
SourceFunction接口的定義如下:
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
}
run方法
run方法為數(shù)據(jù)源向下游發(fā)送數(shù)據(jù)的主要邏輯洞坑。編寫套路為:
- 不斷調(diào)用循環(huán)發(fā)送數(shù)據(jù)。
- 使用一個狀態(tài)變量控制循環(huán)的執(zhí)行蝇率。當(dāng)
cancel
方法執(zhí)行后必須能夠跳出循環(huán)迟杂,停止發(fā)送數(shù)據(jù)刽沾。 - 使用SourceContext的collect等方法將元素發(fā)送至下游。
- 如果使用Checkpoint排拷,在SourceContext collect數(shù)據(jù)的時候必須加鎖侧漓。防止checkpoint操作和發(fā)送數(shù)據(jù)操作同時進行。
cancel方法:
cancel
方法在數(shù)據(jù)源停止的時候調(diào)用监氢。cancel
方法必須能夠控制run
方法中的循環(huán)布蔗,停止循環(huán)的運行。并做一些狀態(tài)清理操作浪腐。
SourceContext類
SourceContext在SourceFunction中使用纵揍,用于向下游發(fā)送數(shù)據(jù),或者是發(fā)送watermark议街。
SourceContext的方法包括:
- collect:向下游發(fā)送數(shù)據(jù)泽谨。有如下三種情況:
- 如果使用ProcessingTime,該元素不攜帶timestamp特漩。
- 如果使用IngestionTime吧雹,元素使用系統(tǒng)當(dāng)前時間作為timestamp。
- 如果使用EventTime涂身,元素不攜帶timestamp雄卷。需要在數(shù)據(jù)流后續(xù)為元素指定timestamp(assignTimestampAndWatermark)。
- collectWithTimestamp:向下游發(fā)送帶有timestamp的數(shù)據(jù)访得。和collect方法一樣也有如下三種情況:
- 如果使用ProcessingTime龙亲,timestamp會被忽略
- 如果使用IngestionTime陕凹,使用系統(tǒng)時間覆蓋timestamp
- 如果使用EventTime悍抑,使用指定的timestamp
- emitWatermark:向下游發(fā)送watermark。watermark也包含一個timestamp杜耙。向下游發(fā)送watermark意味著所有在watermark的timestamp之前的數(shù)據(jù)已經(jīng)到齊搜骡。如果在watermark之后,收到了timestamp比該watermark的timestamp小的元素佑女,該元素會被認(rèn)為遲到记靡,將會被系統(tǒng)忽略,或者進入到旁路輸出(side output)团驱。
- markAsTemporarilyIdle:標(biāo)記此數(shù)據(jù)源暫時閑置摸吠。該數(shù)據(jù)源暫時不會發(fā)送任何數(shù)據(jù)和watermark。僅對IngestionTime和EventTime生效嚎花。下游任務(wù)前移watermark的時候?qū)⒉粫俚却粯?biāo)記為閑置的數(shù)據(jù)源的watermark寸痢。
CheckpointedFunction
如果數(shù)據(jù)源需要保存狀態(tài),那么就需要實現(xiàn)CheckpointedFunction中的相關(guān)方法紊选。
CheckpointedFunction包含如下方法:
- snapshotState:保存checkpoint的時候調(diào)用啼止。需要在此方法中編寫狀態(tài)保存邏輯
- initializeState:在數(shù)據(jù)源創(chuàng)建或者是從checkpoint恢復(fù)的時候調(diào)用道逗。此方法包含數(shù)據(jù)源的狀態(tài)恢復(fù)邏輯。
樣例
Flink官方給出的樣板Source献烦。這個數(shù)據(jù)源會發(fā)送0-999到下游系統(tǒng)滓窍。代碼如下所示:
public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
private long count = 0L;
// 使用一個volatile類型變量控制run方法內(nèi)循環(huán)的運行
private volatile boolean isRunning = true;
// 保存數(shù)據(jù)源狀態(tài)的變量
private transient ListState<Long> checkpointedCount;
public void run(SourceContext<T> ctx) {
while (isRunning && count < 1000) {
// this synchronized block ensures that state checkpointing,
// internal state updates and emission of elements are an atomic operation
// 此處必須要加鎖,防止在checkpoint過程中巩那,仍然發(fā)送數(shù)據(jù)
synchronized (ctx.getCheckpointLock()) {
ctx.collect(count);
count++;
}
}
}
public void cancel() {
// 設(shè)置isRunning為false吏夯,終止run方法內(nèi)循環(huán)的運行
isRunning = false;
}
public void initializeState(FunctionInitializationContext context) {
// 獲取存儲狀態(tài)
this.checkpointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", Long.class));
// 如果數(shù)據(jù)源是從失敗中恢復(fù),則讀取count的值即横,恢復(fù)數(shù)據(jù)源count狀態(tài)
if (context.isRestored()) {
for (Long count : this.checkpointedCount.get()) {
this.count = count;
}
}
}
public void snapshotState(FunctionSnapshotContext context) {
// 保存數(shù)據(jù)到狀態(tài)變量
this.checkpointedCount.clear();
this.checkpointedCount.add(count);
}
}