Flink 程序看起來跟普通程序沒什么區(qū)別疚鲤,都是處理數(shù)據(jù)流集歇。每個程序都有幾個相同的基礎(chǔ)部分組成:
1、申請執(zhí)行環(huán)境
2际歼、加載/創(chuàng)造原始數(shù)據(jù)
3鹅心、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程
4、聲明存儲這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址
5颅筋、觸發(fā)程序結(jié)束
接下來會對每個步驟做一個概述输枯,想要了解細(xì)節(jié)的話請參考各自的部分桃熄。
Java DataStream API 所有的核心類都可以在這里找到。
1碉京、申請執(zhí)行環(huán)境
StreamExecutionEnvironment
是所有 Flink 程序的基礎(chǔ)缎讼,通過調(diào)用它的以下這幾個靜態(tài)方法可以得到一個執(zhí)行環(huán)境:
StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.createLocalEnvironment()
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
如官方例子的main方法中第一行程序:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
一般來說血崭,直接調(diào)用
getExecutionEnvironment()
即可,他會根據(jù)上下文自動處理咽瓷。如果你像平時一樣是在本地 IDE 執(zhí)行程序茅姜,他會創(chuàng)建一個本地環(huán)境在你自己的機(jī)器上執(zhí)行代碼月匣。如果你將代碼打包成一個jar包锄开,并且通過命令行(flink run XXX.jar)的方式提交給 Flink 集群,F(xiàn)link 會為你的程序生成一個執(zhí)行環(huán)境头遭,讓他在集群內(nèi)完成執(zhí)行。
2袜香、加載/創(chuàng)造原始數(shù)據(jù)
談到數(shù)據(jù)源的聲明鲫惶,以基于文件讀取型的場景為例欠母,F(xiàn)link 的執(zhí)行環(huán)境支持了多種方法讓程序從文件讀取:
- 逐行讀取
- 讀取為 CSV 文件
- 使用其他提供的源
下面以從txt文件逐行讀取為例猬腰,講解代碼實現(xiàn):
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
首先姑荷,依然是申請執(zhí)行環(huán)境缩擂,之后調(diào)用該執(zhí)行環(huán)境對象的env.readTextFile
方法即可得到一個數(shù)據(jù)流。
除了readTestFile()
方法懈费,F(xiàn)link 還支持別的方式憎乙,同時也支持其他場景的數(shù)據(jù)源加載叉趣,見官方文檔疗杉,這里僅做列表阵谚,不詳細(xì)解釋:
2.1 文件讀取型數(shù)據(jù)源
-
readTextFile(path)
:從 text 文件中逐行讀取,讀取進(jìn)來后的事件是 string 型的 -
readFile(fileInputFormat, path)
:讀取fileInputFormat
類型的文檔 -
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
:前面兩個方法的內(nèi)部調(diào)用實現(xiàn)
2.2 Socket 型數(shù)據(jù)源
-
socketTextStream
:從 socket 中讀取烟具,各元素以分隔符分開
2.3 集合型
-
fromCollection(Collection)
:從Java.util.Collection
中讀取數(shù)據(jù)生成數(shù)據(jù)流梢什。集合中所有元素必須是同類型。 -
fromCollection(Iterator, Class)
:從迭代器中讀取數(shù)據(jù)生成數(shù)據(jù)流朝聋。參數(shù)Class
用以聲明元素類型 -
fromElements(T ...)
:從數(shù)組中讀取對象數(shù)據(jù)生成數(shù)據(jù)流嗡午,所有對象必須是同類型。 -
fromParallelCollection(SplittableIterator, Class)
:Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator. -
generateSequence(from, to)
:在from~to之間并行生成數(shù)據(jù)返回為一個數(shù)組玖翅。
2.4 定制
-
addSource
:添加一個新的數(shù)據(jù)源生成方法翼馆。如:從 Kafka 中讀取時,可以這么寫:addSource(new FlinkKafkaConsumer<>(...))
金度。具體語法參考connector。
3猜极、聲明對于這些數(shù)據(jù)的轉(zhuǎn)換過程
得到源數(shù)據(jù)流后中姜,接下來就是調(diào)用 Flink 的各個 Operators 來對數(shù)據(jù)流中的數(shù)據(jù)做“變身”動作了。以下面的代碼為例:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
這段代碼調(diào)用了map
這個算子跟伏,將input流中的所有string型數(shù)據(jù)都轉(zhuǎn)換成int型丢胚,生成一個新的數(shù)據(jù)流賦值給parsed。
當(dāng)然受扳,你可以做更復(fù)雜的“變身”携龟,更多官方支持的算子及其語法介紹點我了解。
4勘高、聲明存儲這些轉(zhuǎn)換后數(shù)據(jù)的目標(biāo)地址
當(dāng)你得到一個最終狀態(tài)的數(shù)據(jù)流后峡蟋,接下來要考慮的就是如何把這個數(shù)據(jù)流寫道外部系統(tǒng)了,也就是創(chuàng)建一個 sink华望。最常見的就是打印到控制臺蕊蝗,可以這么實現(xiàn):
parsed.print();
Data sinks 消費數(shù)據(jù)流,并且把他們寫入文件赖舟、sockets蓬戚、外部系統(tǒng),或者直接打印他們宾抓。Flink 內(nèi)置了許多輸出格式子漩。
-
writeAsText() / TextOutputFormat
:以string形式按行輸出元素。該string是通過調(diào)用元素的toString()
方法得到的石洗。 -
writeAsCsv(...) / CsvOutputFormat
:每個元素轉(zhuǎn)換成一個逗號隔開的二元數(shù)組痛单。行于列的分隔符可以配置。列的值同樣是調(diào)用元素的toString()
方法得到的劲腿。 -
print() / printToErr()
:直接打印到標(biāo)準(zhǔn)控制臺旭绒。還可以傳一個前綴參數(shù)加載字符串之前,用以區(qū)分不同的print()調(diào)用焦人。 當(dāng)集群開啟并發(fā)模式且大于1的時候挥吵,還會把產(chǎn)生該數(shù)據(jù)的task的標(biāo)識符作為前綴加到輸出之前。 -
writeUsingOutputFormat() / FileOutputFormat
:自定義文件類型花椭,需要支持自定義類型與bytes類型的轉(zhuǎn)換忽匈。 -
writeToSocket
:根據(jù)序列化規(guī)則輸出到socket。 -
addSink
:調(diào)用外部sink方法矿辽〉ぴ剩可以查看connector了解更多郭厌。
一般來說,
write*()
方法都是用來掉使用的雕蔽,這些方法輸出的元素不參與 Flink 的 checkpoint 機(jī)制折柠。產(chǎn)生的直接效果就是,這些方法輸出的文檔可能會有丟失或者延遲批狐,而我們無法通過 Flink 的checkpoint 機(jī)制來保證扇售。
因此,為了保證流數(shù)據(jù)的精確送達(dá)嚣艇,建議使用 StreamingFileSink承冰。同時,通過.addSink(...)
接口調(diào)用的外部實現(xiàn)也可以參與 Flink 的checkpoint機(jī)制食零,從而保證唯一性送達(dá)困乒。
5、觸發(fā)程序結(jié)束
完成上述所有步驟后贰谣,接下來就是觸發(fā)程序執(zhí)行顶燕,方法就是調(diào)用當(dāng)前執(zhí)行環(huán)境的execute()
方法。根據(jù)執(zhí)行環(huán)境的不同冈爹,F(xiàn)link會自動選擇是在本地執(zhí)行涌攻,還是把代碼提交給集群執(zhí)行。
調(diào)用env.execute()
方法后频伤,F(xiàn)link 會等待程序執(zhí)行完成恳谎,并根據(jù)執(zhí)行模式不同做不同的后續(xù)處理。
如果你不想等待job執(zhí)行完成憋肖,你可以異步觸發(fā)程序執(zhí)行因痛,這可以通過調(diào)用當(dāng)前執(zhí)行環(huán)境的executeAysnc()
方法實現(xiàn)。此時岸更,他會返回給你一個 JobClient鸵膏,之后,你可以通過與 JobClient 交互來得到你提交的 job 的執(zhí)行情況怎炊,如:
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
最后這一步在幫助理解 Flink 算子的執(zhí)行時機(jī)以及方法極其重要谭企。Flink 程序都是懶執(zhí)行的,即:當(dāng)程序的main()
方法被執(zhí)行時评肆,數(shù)據(jù)的加載以及轉(zhuǎn)換工作并不會馬上開始债查,F(xiàn)link 會首先創(chuàng)建一個 dataflow graph,并把他們添加進(jìn)去瓜挽。只有當(dāng)execute()
方法被觸發(fā)時盹廷,算子們的處理才真正開始,這與執(zhí)行環(huán)境類型無關(guān)久橙,不管你是在本地執(zhí)行俄占,還是在集群中執(zhí)行管怠,都是如此。
6缸榄、一個完整的例子
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
// 申請執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 聲明數(shù)據(jù)源:從套接字localhost:9999讀取
// 聲明數(shù)據(jù)源的處理過程:.flat.Map().keyBy().window().sum()
// 生成最終的結(jié)果數(shù)據(jù)源
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
// 配置sink渤弛,將結(jié)果集打印到控制臺
dataStream.print();
// 觸發(fā)執(zhí)行
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
編碼完成后,本地啟動socket輸入端:
nc -lk 9999
然后敲入一些單詞碰凶,之后啟動 job,查看 job 的控制臺輸出鹿驼,是你輸入的這些么欲低?