Flink中的流應(yīng)用就是在數(shù)據(jù)流上應(yīng)用各種轉(zhuǎn)化(如:filter,update state,difine window,aggregation)苔巨。數(shù)據(jù)流有各種數(shù)據(jù)源創(chuàng)建而來(lái)(如:消息隊(duì)列,socket流掐禁,文件等)。結(jié)果輸出到sink它改,如寫入文件或者標(biāo)準(zhǔn)輸出。Flink程序可以在多種上下文中運(yùn)行,standalone疆拘,內(nèi)置在其他應(yīng)用中等。應(yīng)用可以在本地JVM中執(zhí)行寂曹,也可以在集群的許多機(jī)器中執(zhí)行哎迄。
示例程序
下面的程序是一個(gè)完成的應(yīng)用,它演示了如何在web soscke上使用window統(tǒng)計(jì)5秒內(nèi)的字?jǐn)?shù)隆圆。你可以復(fù)制代碼然后在你本地運(yùn)行漱挚。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
運(yùn)行應(yīng)用前,先使用 netcat 在命令行中開(kāi)啟輸入流:
nc -lk 9999
輸入一些單詞就會(huì)返回新的結(jié)果渺氧。這些單詞或作為字?jǐn)?shù)統(tǒng)計(jì)應(yīng)用的輸入旨涝。如果你想看到統(tǒng)計(jì)值大于1,可以在5秒內(nèi)一遍又一遍的輸入相同的單詞(如果你做不到侣背,可以增加window的大邪谆)
數(shù)據(jù)源 Data Source
Source指的是你的程序從哪里讀取它的輸入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加數(shù)據(jù)源秃踩。Flink自帶了一些實(shí)現(xiàn)好的數(shù)據(jù)源函數(shù)衬鱼,淡然你可以實(shí)現(xiàn) SourceFunction 來(lái)實(shí)現(xiàn)自定義的非并行的source或者實(shí)現(xiàn) ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 來(lái)實(shí)現(xiàn)并行的source。
StreamExecutionEnvironment有一些實(shí)現(xiàn)定義好的數(shù)據(jù)源方法:
基于文件的數(shù)據(jù)源:
- readTextFile(path) - 讀取text文件憔杨。也就是說(shuō)使用 TextInputFormat 一行一行的讀取數(shù)據(jù)鸟赫。
- readFile(fileInputFormat,path) - 使用給定的 input format讀取文件
- readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) - 這個(gè)方法在flink內(nèi)部,被上面的兩個(gè)方法所調(diào)用消别。它使用給定的fileInputFomat讀取path中的文件抛蚤。根據(jù) watchType 的值,數(shù)據(jù)源會(huì)定期(interval 毫秒)監(jiān)控path中的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY)寻狂,或者僅對(duì)當(dāng)前path下的文件進(jìn)行一次處理岁经,然后退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter 蛇券,用戶可以排除不想要處理的文件缀壤。
實(shí)現(xiàn):
在內(nèi)部,F(xiàn)link將讀取文件分為兩個(gè)子任務(wù)纠亚,分別叫做 目錄監(jiān)控 與 數(shù)據(jù)讀取塘慕。每個(gè)任務(wù)都是單獨(dú)運(yùn)行的。目錄監(jiān)控是一個(gè)單線程的任務(wù)蒂胞,而數(shù)據(jù)讀取任務(wù)可以是多線程的并發(fā)任務(wù)图呢。數(shù)據(jù)讀取任務(wù)的并發(fā)度取決于job的并發(fā)度。目錄監(jiān)控的功能在于定期監(jiān)控目錄,發(fā)現(xiàn)需要被處理的文件蛤织,將它們分片 split 然后指定分片給下游的reader赴叹。reader會(huì)進(jìn)行實(shí)際讀取數(shù)據(jù)的操作。每一個(gè)split僅會(huì)被一個(gè)reader讀取指蚜,而一個(gè)reader可能會(huì)讀取多個(gè)split(依次讀绕蚯伞)。
重要說(shuō)明:
1.如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY姚炕,當(dāng)文件被修改后摊欠,它的內(nèi)容會(huì)被全部重新進(jìn)行處理。這會(huì)破壞“精確一次”的語(yǔ)義柱宦,因?yàn)橄蛭募凶芳訑?shù)據(jù)些椒,會(huì)導(dǎo)致整個(gè)文件進(jìn)行重新處理。
2.如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE掸刊,source只會(huì)掃描path一次然后退出免糕,而不會(huì)等到reader讀取所有數(shù)據(jù)完畢后再退出。當(dāng)然忧侧,reader會(huì)繼續(xù)進(jìn)行數(shù)據(jù)讀取石窑,直到所有文件內(nèi)容都讀取完畢。關(guān)閉source會(huì)導(dǎo)致之后不會(huì)再有checkpoint蚓炬。這將導(dǎo)致故障恢復(fù)時(shí)松逊,需要等待更長(zhǎng)的時(shí)間,因?yàn)閖ob會(huì)從上次checkpoint處會(huì)進(jìn)行重新讀取數(shù)據(jù)肯夏。
基于socket:
- socketTextStream - 從socket讀取數(shù)據(jù)经宏。數(shù)據(jù)可以被 分隔符delimiter 分隔開(kāi)
基于集合:
- fromCollection(Collection) - 從java集合中創(chuàng)造數(shù)據(jù)流。所有集合中的數(shù)據(jù)必須是同樣的類型
- fromCollection(Iterator,Class) - 從iterator中創(chuàng)造數(shù)據(jù)流驯击。class參數(shù)指定了iterator返回的數(shù)據(jù)的類型
- fromElements(T ...) - 從給定的對(duì)象序列中創(chuàng)造數(shù)據(jù)流烁兰。所有對(duì)象必須是相同的類型
- fromParallelCollection(SplitableIterator , Class) - 從iterator中并行的創(chuàng)造數(shù)據(jù)流。class參數(shù)指定了iterator返回的數(shù)據(jù)的類型
- generateSequence(from,to) - 使用給定的interval并行的生成數(shù)字序列
自定義:
- addSource - 使用source function徊都。如沪斟,從Kafka中讀取數(shù)據(jù),你可以使用 addSource(new FlinkKafkaConsumer08<>(...)).
DataStream Transformations
查閱 operator 文檔
Data Sink
Data Sink讀取數(shù)據(jù)流暇矫,并將它們寫入到file,socket,其他系統(tǒng)或者打印它們主之。Flink自帶了一些output format,它們被封裝到一些操作符中:
- writeAsText() / TextOutputFormat - 將數(shù)據(jù)作為一整行string李根,寫入文件槽奕。通過(guò)調(diào)用數(shù)據(jù)的toString方法
- writeAsCsv(...) / CsvOutputFormat - 將 tuple 以逗號(hào)分隔,寫入文件朱巨。行與行以及field之間的分隔符可以自定義。每一個(gè)field的值枉长,是通過(guò)調(diào)用toString方法獲取的
- pring() / pringToErr() - 將數(shù)據(jù)的toString方法的值打印到口紅紙條冀续∏矸恚可以選擇前綴,在打印輸出內(nèi)容前先打印前綴洪唐。這能夠區(qū)分不同的print的內(nèi)容钻蹬。如果并發(fā)度大于1,輸出同樣會(huì)打印一個(gè)task的標(biāo)識(shí)符凭需。
- writeUsingOutputFormat() / FileOutputFormat - 使用自定義文件輸出的基類與方法问欠。支持自定義的 對(duì)象-字節(jié) 的轉(zhuǎn)化。
- writeToSocket - 根據(jù) SerializationSchema 將數(shù)據(jù)寫入socket
- addSink - 調(diào)用傳入的自定義 sink function粒蜈。Flink通過(guò)實(shí)現(xiàn) sink function可以與其他系統(tǒng)連接起來(lái)(如kafka)
注意的是 write*() 方法主要用于調(diào)試的目的顺献。它們沒(méi)有參與flink的checkpoint過(guò)程,這就意味著使用這些函數(shù)是“at-least-once”至少一次語(yǔ)義枯怖。數(shù)據(jù)如何寫入目標(biāo)系統(tǒng)是由OutputFormat決定的注整,也就是說(shuō)發(fā)送到OutputFormat的數(shù)據(jù)并不一定會(huì)立即寫入目標(biāo)系統(tǒng)(如批量寫入情況)。因此度硝,在遇到故障時(shí)肿轨,這些數(shù)據(jù)有可能會(huì)丟失。
為了穩(wěn)定地蕊程,精確一致的將流數(shù)據(jù)寫入問(wèn)加你系統(tǒng)椒袍,建議使用 flink-connector-filesystem。當(dāng)然藻茂,如果自定義了sink function驹暑,通過(guò) addSink 添加該自定義的sink,也可以參與flink的checkpoint過(guò)程捌治,保持 exactly-once 語(yǔ)義岗钩。
Iterator
迭代流程序?qū)崿F(xiàn)了step function,并且內(nèi)置在 IterativeStream中肖油。由于DataStream程序可能不會(huì)停止兼吓,因此iteration中不會(huì)有最大數(shù)量限制。你需要定義流中的哪些數(shù)據(jù)需要繼續(xù)迭代森枪,哪些數(shù)據(jù)可以發(fā)送到下游的操作符视搏,你可以使用split或者filter實(shí)現(xiàn)县袱。下面我們使用 filter 來(lái)演示。首先式散,我們定義一個(gè) IterativeStream :
IterativeStream<Integer> iteration = input.iterate();
然后,我們定義在循環(huán)中,需要對(duì)數(shù)據(jù)流做哪些操作(下面我們就簡(jiǎn)單的使用map作為演示)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
為了定義迭代器何時(shí)關(guān)閉编饺,可以調(diào)用 IterativeStream 的 closeWith(feedbackStream) 方法。傳入 closeWith() 的數(shù)據(jù)流會(huì)再次進(jìn)入迭代器透且,放到迭代器的head。一個(gè)常用的模式是秽誊,使用filter將流的一部分重新放入迭代器,而另一部分下發(fā)到下游操作符這些filter可以定義“終止”的邏輯锅论,也就是一個(gè)數(shù)據(jù)可以不再進(jìn)入迭代器,而是被轉(zhuǎn)發(fā)到下游操作符棍厌。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
例如,下面的程序就是對(duì)數(shù)據(jù)進(jìn)行減1操作耘纱,直到為0:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
Execution Parameter 執(zhí)行參數(shù)
StreamExecutionEnvironment 包括 ExecutionConfig 毕荐,它允許設(shè)置運(yùn)行時(shí)所需的job配置束析。
請(qǐng)參閱 execution configuration 獲取更多參數(shù)的解釋。下面的參數(shù)僅屬于 DataStream API:
- setAutoWatermarkInterval(long milliseconds) : 設(shè)置 watermark 發(fā)射的間隔憎亚。你可以公共 long getAutoWatermartkInterval() 獲取當(dāng)前的值员寇。
故障容忍
控制延遲
默認(rèn)情況下,數(shù)據(jù)在網(wǎng)絡(luò)間傳輸時(shí)第美,并不是一個(gè)一個(gè)的傳輸(造成不必要的網(wǎng)絡(luò)擁堵)蝶锋,而是緩存后一起傳輸。buffer的大小可以在Flink 的配置文件中配置什往。盡管這種方式可以優(yōu)化吞吐率扳缕,但是當(dāng)輸入流的速度不夠快時(shí),會(huì)造成延遲問(wèn)題别威。為了平衡吞吐率和延遲躯舔,你可以使用 env.setBufferTimeout(timeoutMillis) 來(lái)設(shè)置最大等待時(shí)間。超過(guò)這個(gè)時(shí)間后省古,即便buffer沒(méi)有填滿粥庄,也要發(fā)出去。默認(rèn)值為100ms豺妓。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
為了最大化吞吐率惜互,設(shè)置 setBufferTimeout(-1) 會(huì)移除超時(shí)設(shè)置布讹,僅當(dāng)buffer填滿后才發(fā)送。為了最小化延遲训堆,設(shè)置超時(shí)的值接近0(如 5 或 10 毫秒)炒事。應(yīng)該避免設(shè)置值為0,因?yàn)檫@會(huì)導(dǎo)致服務(wù)性能下降蔫慧。
調(diào)試 Debugging
在提交任務(wù)到分布式集群運(yùn)行前,最好確認(rèn)程序可以按預(yù)期運(yùn)行权薯。因此姑躲,實(shí)現(xiàn)一個(gè)數(shù)據(jù)分析應(yīng)用,通常是一個(gè)增量的過(guò)程:檢查結(jié)果盟蚣,調(diào)試黍析,優(yōu)化。
Flink提供了本地IDE調(diào)試的功能屎开,簡(jiǎn)化了數(shù)據(jù)分析應(yīng)用的開(kāi)發(fā)阐枣。包括加載測(cè)試數(shù)據(jù),收集結(jié)果數(shù)據(jù)奄抽。這一部分會(huì)顯示如何簡(jiǎn)化flink程序的開(kāi)發(fā)蔼两,便于測(cè)試調(diào)試程序。
本地運(yùn)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
加載測(cè)試數(shù)據(jù)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
注意:需要提供數(shù)據(jù)烈性逞度,iterator要實(shí)現(xiàn) Serializable额划。不能并發(fā)執(zhí)行
迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)