Google翻譯
Flink程序是實現(xiàn)分布式集合轉(zhuǎn)換的常規(guī)程序(例如败潦,過濾街佑,映射,狀態(tài)更新,聯(lián)合键闺,分組,窗口定義和聚合)嫂冻。集合從數(shù)據(jù)源中初始化創(chuàng)建(例如通過讀取文件恤批,kafka主題或者從本地,內(nèi)存集合中)尔艇。結(jié)果通過接收器返回尔许,接收器可以將數(shù)據(jù)寫入(分布式)文件或標(biāo)準(zhǔn)輸出(例如,命令行終端)Flink程序在各種環(huán)境中運(yùn)行终娃,獨(dú)立味廊,或內(nèi)嵌于其他程序中。它可以在本地JVM或多機(jī)器組成的集群中執(zhí)行棠耕。
取決于數(shù)據(jù)源的類型余佛,即有界源和無界源,你可以使用DataSet API編寫批處理程序窍荧,DataStream API編寫流處理程序辉巡。本指南將介紹兩種API共有的基本概念,但請參閱我們的流處理指南和批處理指南蕊退,了解有關(guān)使用每個API編寫程序的具體信息郊楣。
數(shù)據(jù)集和數(shù)據(jù)流
Flink具有特殊類DataSet和DataStream來表示程序中的數(shù)據(jù)憔恳。您可以將它們視為可以包含重復(fù)項的不可變數(shù)據(jù)集合。在DataSet的情況下痢甘,數(shù)據(jù)是有限的喇嘱,而對于DataStream,元素的數(shù)量可以是無界的塞栅。
這些集合在某些關(guān)鍵方面與常規(guī)Java集合不同者铜。首先,它們是不可變的放椰,這意味著一旦創(chuàng)建它們就無法添加或刪除元素作烟。你也不能簡單地檢查里面的元素。
最初通過在Flink程序中添加源來創(chuàng)建集合砾医,并通過使用諸如map拿撩,filter等API方法對它們進(jìn)行轉(zhuǎn)換來從這些集合中派生新集合。
Flink程序剖析
Flink程序看起來像是轉(zhuǎn)換數(shù)據(jù)集合的常規(guī)程序如蚜。每個程序包含相同的基本部分:
- 包含一個執(zhí)行環(huán)境
- 加載/創(chuàng)建初始數(shù)據(jù)
- 指定數(shù)據(jù)的轉(zhuǎn)換
- 指定放置計算結(jié)果的位置
- 觸發(fā)程序執(zhí)行
我們現(xiàn)在將概述每個步驟压恒,請參閱相應(yīng)部分以獲取更多詳細(xì)信息。請注意错邦,Java DataSet API的所有核心類都可以在org.apache.flink.api.java包中找到探赫,而Java DataStream API的類可以在org.apache.flink.streaming.api中找到。
StreamExecutionEnvironment是所有Flink程序的基礎(chǔ)撬呢。您可以使用這些靜態(tài)方法獲取一個
StreamExecutionEnvironment:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常伦吠,您只需要使用getExecutionEnvironment(),因為這將根據(jù)上下文做正確的事情: 如果您在IDE中執(zhí)行程序或作為常規(guī)Java程序魂拦,它將創(chuàng)建一個本地環(huán)境毛仪,該環(huán)境將在本地計算機(jī)上執(zhí)行您的程序。如果您從程序中創(chuàng)建了一個JAR文件芯勘,并通過命令行調(diào)用它箱靴,則Flink集群管理器將執(zhí)行您的main方法,getExecutionEnvironment()將返回一個執(zhí)行環(huán)境荷愕,用于在集群上執(zhí)行您的程序衡怀。
對于指定數(shù)據(jù)源,執(zhí)行環(huán)境有幾種方法可以使用各種方法從文件中讀嚷贩:您可以逐行讀取它們,CSV文件或使用完全自定義數(shù)據(jù)輸入格式茄靠。要將文本文件作為一系列行讀取茂契,您可以使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
這將為您提供一個DataStream,然后您可以在其上應(yīng)用轉(zhuǎn)換來創(chuàng)建新的派生DataStream慨绳。
您可以通過使用轉(zhuǎn)換函數(shù)調(diào)用DataStream上的方法來應(yīng)用轉(zhuǎn)換掉冶。例如真竖,地圖轉(zhuǎn)換如下所示:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
這將通過將原始集合中的每個String轉(zhuǎn)換為Integer來創(chuàng)建新的DataStream。
一旦有了包含最終結(jié)果的DataStream厌小,就可以通過創(chuàng)建接收器將其寫入外部系統(tǒng)恢共。這些只是創(chuàng)建接收器的一些示例方法:
writeAsText(String path)
print()
一旦指定了完整的程序,就需要通過調(diào)用execute()來觸發(fā)程序在StreamExecutionEnvironment上執(zhí)行璧亚。根據(jù)ExecutionEnvironment的類型讨韭,將在本地計算機(jī)上觸發(fā)執(zhí)行或提交程序以在群集上執(zhí)行。
execute()方法返回一個JobExecutionResult癣蟋,它包含執(zhí)行時間和累加器結(jié)果透硝。
有關(guān)流數(shù)據(jù)源和接收器的信息,請參閱流指南疯搅,以及有關(guān)DataStream上支持的轉(zhuǎn)換的更深入信息濒生。
有關(guān)批處理數(shù)據(jù)源和接收器的信息,請查看批處理指南幔欧,以及有關(guān)DataSet支持的轉(zhuǎn)換的更深入信息罪治。
懶評估
所有Flink程序都是懶惰地執(zhí)行的:當(dāng)執(zhí)行程序的main方法時,數(shù)據(jù)加載和轉(zhuǎn)換不會直接發(fā)生礁蔗。而是創(chuàng)建每個操作并將其添加到程序的計劃中觉义。當(dāng)執(zhí)行環(huán)境上的execute()調(diào)用顯式觸發(fā)執(zhí)行時,實際執(zhí)行操作瘦麸。程序是在本地執(zhí)行還是在集群上執(zhí)行取決于執(zhí)行環(huán)境的類型谁撼。
懶惰的評估使您可以構(gòu)建Flink作為一個整體計劃單元執(zhí)行的復(fù)雜程序。
指定鍵
某些轉(zhuǎn)換(join滋饲,coGroup厉碟,keyBy,groupBy)要求在元素集合上定義鍵屠缭。其他轉(zhuǎn)換(Reduce箍鼓,GroupReduce,Aggregate呵曹,Windows)允許數(shù)據(jù)在應(yīng)用之前在鍵上分組款咖。
數(shù)據(jù)集分組:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
數(shù)據(jù)流通過指定鍵通過:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的數(shù)據(jù)模型不基于鍵值對。因此奄喂,無需將數(shù)據(jù)集類型物理打包到鍵和值中铐殃。鍵是“虛擬的”:它們被定義為實際數(shù)據(jù)上的函數(shù),以指導(dǎo)分組操作符跨新。
在下面的討論中富腊,我們將使用DataStream API和keyBy。對于DataSet API域帐,您只需要用DataSet和groupBy替換赘被。
元組定義鍵
最簡單的情況是在元組的一個或多個字段上對元組進(jìn)行分組:
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元組以第一個字段分組
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
在這里是整,我們將元組分組在由第一個和第二個字段組成的復(fù)合鍵上。
關(guān)于嵌套元組的注釋:如果你有一個帶有嵌套元組的DataStream民假,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)將使系統(tǒng)使用完整的Tuple2作為鍵(以Integer和Float為鍵)浮入。如果要“導(dǎo)航”到嵌套的Tuple2中,則必須使用下面解釋的字段表達(dá)式羊异。
使用字段表達(dá)式定義鍵
您可以使用基于字符串的字段表達(dá)式來引用嵌套字段事秀,并定義用于分組,排序球化,連接或coGrouping的鍵秽晚。
字段表達(dá)式可以非常輕松地選擇(嵌套)復(fù)合類型中的字段,例如Tuple和POJO類型筒愚。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
字段表達(dá)式語法:
- 按字段名稱選擇POJO字段赴蝇。例如,“user”指的是POJO類型的“user”字段巢掺。
- 按字段名稱或0偏移字段索引選擇元組字段句伶。例如,“f0”和“5”分別表示Java元組類型的第一和第六字段陆淀。
- 您可以在POJO和Tuples中選擇嵌套字段考余。例如,“user.zip”指的是POJO的“zip”字段轧苫,其存儲在POJO類型的“user”字段中楚堤。持任意嵌套和混合POJO和元組,例如“f1.user.zip”或“user.f3.1.zip”含懊。
- 您可以使用“*”通配符表達(dá)式選擇完整類型身冬。這也適用于非Tuple或POJO類型的類型。
字段表達(dá)式例子:
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
這些是上面示例代碼的有效字段表達(dá)式:
- "count": WC類中的count字段岔乔。
- "complex": 遞歸選擇POJO類型ComplexNestedClass的字段復(fù)合體的所有字段酥筝。
- "complex.word.f2": 選擇嵌套Tuple3的最后一個字段。
- "complex.hadoopCitizen": 選擇Hadoop IntWritable類型
使用Selector Function定義鍵
定義鍵的另一種方法是“鍵選擇器”功能雏门。鍵選擇器函數(shù)將單個元素作為輸入并返回元素的鍵嘿歌。鍵可以是任何類型,并且可以從確定性計算中導(dǎo)出茁影。
以下示例顯示了一個鍵選擇器函數(shù)宙帝,它只返回一個對象的字段:
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
指定轉(zhuǎn)換函數(shù)
大多數(shù)轉(zhuǎn)換都需要用戶定義的函數(shù)。本節(jié)列出了如何指定它們的不同方法
實現(xiàn)接口
最基本的方法是實現(xiàn)一個提供的接口:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
匿名類
您可以將函數(shù)作為匿名類傳遞:
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas
Flink支持Lambdas
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich 函數(shù)
需要用戶定義函數(shù)的所有轉(zhuǎn)換都可以將Rich函數(shù)作為參數(shù)募闲。例如步脓,不是這樣:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
你可以這樣寫:
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
并像往常一樣將函數(shù)傳遞給map函數(shù)
data.map(new MyMapFunction());
Rich函數(shù)也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
除了用戶定義的函數(shù)(map,reduce等)之外,Rich函數(shù)還提供了四種方法:open沪编,close,getRuntimeContext和setRuntimeContext年扩。這些用于參數(shù)化函數(shù)(請參閱將參數(shù)傳遞給函數(shù))蚁廓,創(chuàng)建和完成本地狀態(tài),訪問廣播變量(請參閱廣播變量)以及訪問運(yùn)行時信息(如累加器和計數(shù)器)(請參閱累加器和計數(shù)器)以及有關(guān)信息的信息厨幻。迭代(參見迭代)相嵌。
支持的數(shù)據(jù)類型
Flink對DataSet或DataStream中可以包含的元素類型設(shè)置了一些限制。原因是系統(tǒng)分析類型以確定有效的執(zhí)行策略况脆。
有六種不同類別的數(shù)據(jù)類型:
- Java Tuples 和 Scala Case 類
- Java POJOs
- 原始類型
- 常規(guī)類
- 值
- Hadoop Writables
- Special Types
Tuples
Tuples是包含固定數(shù)量的具有各種類型的字段的復(fù)合類型饭宾。 Java API提供從Tuple1到Tuple25的類。Tuples的每個字段都可以是包含更多Tuple的任意Flink類型格了,從而產(chǎn)生嵌套元組看铆。可以使用字段名稱tuple.f4直接訪問Tuple的字段盛末,或使用通用getter方法tuple.getField(int position).字段索引從0開始弹惦。請注意,這與Scala元組形成對比悄但,但它與Java的一般索引更為一致棠隐。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
POJOs
如果滿足以下要求,則Flink將Java和Scala類視為特殊的POJO數(shù)據(jù)類型:
- 類為public
- 無參的public 構(gòu)造函數(shù)
- 所有的字段必須有g(shù)etter和setter方法
- Flink必須支持字段的類型檐嚣。目前助泽,F(xiàn)link使用Avro序列化任意對象(例如Date)。
Flink分析POJO類型的結(jié)構(gòu)嚎京,即它了解POJO的字段嗡贺。因此,POJO類型比一般類型更容易使用挖藏。此外暑刃,F(xiàn)link可以比一般類型更有效地處理POJO。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
原始類型
Flink支持所有Java和Scala原語類型膜眠,如Integer岩臣,String和Double。
一般類型
Flink支持大多數(shù)Java和Scala類(API和自定義)宵膨。限制適用于包含無法序列化的字段的類架谎,如文件指針,I / O流或其他本機(jī)資源辟躏。遵循Java Beans約定的類通彻瓤郏可以很好地工作。
所有未標(biāo)識為POJO類型的類(請參閱上面的POJO要求)都由Flink作為常規(guī)類類型處理。Flink將這些數(shù)據(jù)類型視為黑盒子会涎,并且無法訪問其內(nèi)容(即裹匙,用于有效排序)。使用序列化框架Kryo對常規(guī)類型進(jìn)行反序列化末秃。
值
值類型手動描述其序列化和反序列化概页。它們不是通過通用序列化框架,而是通過使用讀取和寫入方法實現(xiàn)org.apache.flinktypes.Value接口來為這些操作提供自定義代碼练慕。當(dāng)通用序列化效率非常低時惰匙,使用值類型是合理的。一個示例是將元素的稀疏向量實現(xiàn)為數(shù)組的數(shù)據(jù)類型铃将。知道數(shù)組大部分為零项鬼,可以對非零元素使用特殊編碼,而通用序列化只需編寫所有數(shù)組元素劲阎。
org.apache.flinktypes.CopyableValue接口以類似的方式支持手動內(nèi)部克隆邏輯绘盟。
Flink帶有與基本數(shù)據(jù)類型對應(yīng)的預(yù)定義值類型。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)這些Value類型充當(dāng)基本數(shù)據(jù)類型的可變變體:它們的值可以更改悯仙,允許程序員重用對象并從垃圾收集器中減輕壓力奥此。
Hadoop Writables
您可以使用實現(xiàn)org.apache.hadoop.Writable接口的類型。 write()和readFields()方法中定義的序列化邏輯將用于序列化雁比。
特殊類型
您可以使用特殊類型稚虎,包括Scala的Either,Option和Try偎捎。 Java API有自己的自定義Either實現(xiàn)蠢终。與Scala的Either類似,它代表兩種可能類型的值茴她,左或右寻拂。兩者都可用于錯誤處理或需要輸出兩種不同類型記錄的運(yùn)算符。
類型擦除和類型推斷
僅針對Java
Java編譯器在編譯后拋棄了大部分泛型類型信息丈牢。這在Java中稱為類型擦除祭钉。這意味著在運(yùn)行時,對象的實例不再知道其泛型類型己沛。例如慌核,DataStream <String>和DataStream <Long>的實例與JVM看起來相同。
Flink在準(zhǔn)備執(zhí)行程序時(當(dāng)調(diào)用程序的主要方法時)需要類型信息申尼。Flink Java API嘗試重建以各種方式丟棄的類型信息垮卓,并將其顯式存儲在數(shù)據(jù)集和運(yùn)算符中。您可以通過DataStream.getType()檢索類型师幕。該方法返回TypeInformation的一個實例粟按,這是Flink表示類型的內(nèi)部方式。
類型推斷有其局限性,在某些情況下需要編程人員的“合作”灭将。這方面的示例是從集合創(chuàng)建數(shù)據(jù)集的方法疼鸟,例如ExecutionEnvironment.fromCollection(),你可以在其中傳遞描述類型的參數(shù)庙曙。但是像MapFunction <I愚臀,O>這樣的通用函數(shù)也可能需要額外的類型信息。
ResultTypeQueryable接口可以通過輸入格式和函數(shù)來實現(xiàn)矾利,以明確告知API有關(guān)其返回類型的信息。調(diào)用函數(shù)的輸入類型通巢鐾啵可以通過先前操作的結(jié)果類型來推斷男旗。
累加器和計數(shù)器
累加器是具有添加操作和最終累積結(jié)果的簡單構(gòu)造,可在作業(yè)結(jié)束后使用欣鳖。
最直接的累加器是一個計數(shù)器:您可以使用Accumulator.add(V值)方法遞增它察皇。在工作結(jié)束時,F(xiàn)link將匯總(合并)所有部分結(jié)果并將結(jié)果發(fā)送給客戶泽台。在調(diào)試過程中什荣,或者如果你想快速了解有關(guān)數(shù)據(jù)的更多信息,累加器非常有用怀酷。
link目前有以下內(nèi)置累加器稻爬。它們中的每一個都實現(xiàn)了Accumulator接口。
- IntCounter, LongCounter and DoubleCounter: 有關(guān)使用計數(shù)器的示例蜕依,請參見下文桅锄。
- Histogram: 離散數(shù)量的箱的直方圖實現(xiàn)。在內(nèi)部样眠,它只是一個從Integer到Integer的映射友瘤。你可以使用它來計算值的分布,例如字?jǐn)?shù)統(tǒng)計程序的每行字?jǐn)?shù)分布檐束。
如何使用累加器
首先辫秧,您必須在要使用它的用戶定義轉(zhuǎn)換函數(shù)中創(chuàng)建累加器對象(此處為計數(shù)器)。
private IntCounter numLines = new IntCounter();
其次被丧,你必須注冊累加器對象盟戏,通常在rich function的open()方法中。在這里你還可以定義名稱甥桂。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
你現(xiàn)在可以在運(yùn)算符函數(shù)中的任何位置使用累加器抓半,包括open()和close()方法。
this.numLines.add(1);
整個結(jié)果將存儲在JobExecutionResult對象中格嘁,該對象是從執(zhí)行環(huán)境的execute()方法返回的(當(dāng)前這僅在執(zhí)行等待作業(yè)完成時才有效)笛求。
myJobExecutionResult.getAccumulatorResult("num-lines")
所有累加器每個作業(yè)共享一個命名空間。因此,您可以在作業(yè)的不同操作函數(shù)中使用相同的累加器探入。 Flink將在內(nèi)部合并所有具有相同名稱的累加器狡孔。
關(guān)于累加器和迭代的注釋:目前累加器的結(jié)果僅在整個作業(yè)結(jié)束后才可用。我們還計劃在下一次迭代中使前一次迭代的結(jié)果可用蜂嗽。您可以使用聚合器來計算每次迭代統(tǒng)計信息苗膝,并根據(jù)此類統(tǒng)計信息確定迭代的終止。
自定義累加器
要實現(xiàn)自己的累加器植旧,只需編寫Accumulator接口的實現(xiàn)即可辱揭。如果您認(rèn)為您的自定義累加器應(yīng)與Flink一起提供,請隨意創(chuàng)建拉取請求病附。
你可以選擇實現(xiàn)Accumulator 或者 SimpleAccumulator问窃。
Accumulator<V,R> 最便捷:它為要添加的值定義類型V,為最終結(jié)果定義結(jié)果類型R.例如完沪。對于直方圖域庇,V是數(shù)字,R是直方圖覆积。SimpleAccumulator 適用于兩種類型相同的情況听皿,比如計數(shù)器