基本的API概念
Flink程序是實現(xiàn)分布式集合轉(zhuǎn)換操作(如:過濾典奉、映射馍悟、更改狀態(tài)赃承、join崭孤、分組、定義窗口剪撬、聚合等)的有規(guī)律的程序扛施。集合最初是由sources(數(shù)據(jù)源)(例如: 從文件中讀取、kafka topic泻轰、或者來自本地、內(nèi)存中的集合等)創(chuàng)建的, 結(jié)果通過sink輸出脏答,可能是將數(shù)據(jù)寫入文件中糕殉,或者以標(biāo)準(zhǔn)輸出的形式輸出(例如:輸出到控制臺)。Flink程序可以以不同的形式運(yùn)行——以獨(dú)立的形式運(yùn)行或者嵌入到其他程序中執(zhí)行殖告。執(zhí)行的動作可以發(fā)生在本地,也可以發(fā)生在多臺機(jī)器構(gòu)成的集群中雳锋。
根據(jù)數(shù)據(jù)源類型的不同黄绩,可以是有界的或者無界的,你可以寫批處理程序或者流處理程序玷过,其中DataSet API是提供給批處理程序用的爽丹,而DataStream API是提供給流處理程序用的。本指南將介紹這兩種API共同的基礎(chǔ)概念辛蚊,但是具體些程序的時候粤蝎,請參考具體的流處理指南和批處理指南。
當(dāng)展示實際的例子來闡釋API是如何被使用時袋马,我們將使用StreamingExecutionEnvironment
和DataStream API
初澎,這些概念在DataSet API中也是一樣的,僅僅是替換成ExecutionEnvironment
和DataSet
而已虑凛。
數(shù)據(jù)集和數(shù)據(jù)流(DataSet and DataStream )
Flink使用DataSet
和DataStream
這兩個特殊的類來表示程序中的數(shù)據(jù)碑宴,你可以將它們想象成一個包含重復(fù)數(shù)據(jù)的不可變數(shù)據(jù)集合,其中DataSet
的數(shù)據(jù)是有限的而DataStream
中的數(shù)據(jù)個數(shù)則是無限的桑谍。
這些集合在某些關(guān)鍵情況下跟常規(guī)的Java集合是不同的延柠。首先,它們是不可變的锣披,也就是說一旦你創(chuàng)建了贞间,你就再也不能添加或者刪除了,同時你也不能簡單的查看里面的數(shù)據(jù)雹仿。
一個集合最初是由Flink程序中的Source創(chuàng)建的增热,之后新的集合則是由最初的集合通過調(diào)用API的方法轉(zhuǎn)換而來的,例如:map盅粪、filter等钓葫。
剖析Flink程序(Anatomy of a Flink Program)
Flink程序看起來像似常規(guī)程序中的數(shù)據(jù)集轉(zhuǎn)換操作。每個程序都是由相同的基礎(chǔ)部分構(gòu)成:
1票顾、 獲取一個execution environment
2础浮、 拉取或者創(chuàng)建一個初始數(shù)據(jù)集
3帆调、 指定數(shù)據(jù)集的轉(zhuǎn)換操作
4、 指定計算結(jié)果保存在哪
5豆同、 觸發(fā)程序執(zhí)行
現(xiàn)在我們給出每一個步驟的概述番刊,具體請參考各個部分的詳細(xì)信息。注意影锈,Scala DataSet API中的核心代碼都可以在這個包下面找到org.apache.flink.api.scala
,而所有的Scala DataStream API 的核心代碼都可以在org.apache.flink.streaming.api.scala
這個包下找到芹务。
StreamExecutionEnvironment
是所有流式Flink程序的基礎(chǔ)。你可以通過StreamExecutionEnvironment
的靜態(tài)方法來獲取:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常情況下鸭廷,你僅需要調(diào)用getExecutionEnvironment()
就可以了枣抱,因為這個函數(shù)將會根據(jù)上下文去創(chuàng)建正確的ExecutionEnvironment
,如果你在IDE中執(zhí)行程序或者將程序作為一個常規(guī)的Java/Scala程序執(zhí)行辆床,那么它將為你創(chuàng)建一個本地的環(huán)境佳晶,你的程序?qū)⒃诒镜貓?zhí)行。如果你將你的程序打成jar包讼载,并通過命令行調(diào)用它轿秧,那么Flink集群管理器將執(zhí)行你的main
方法并且getExecutionEnvironment()
方法將為你的程序在集群中執(zhí)行生成一個執(zhí)行環(huán)境。
對于指定的數(shù)據(jù)源咨堤,執(zhí)行環(huán)境有一些方法來以不同的方式讀取文件中的數(shù)據(jù):你可以一行一行的讀取菇篡,如CSV文件讀取,或者用自定義的數(shù)據(jù)輸入格式讀取一喘。為了以一系列行數(shù)來讀取一個文本文件驱还,你可以使用如下方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
這個將會產(chǎn)生一個DataStream給你,你可以在這個DataStream中使用transformation操作來創(chuàng)建新的DataStream津滞。
你可以使用DataSet的轉(zhuǎn)換操作通過調(diào)用DataSet中的transformation函數(shù)铝侵。例如:一個map轉(zhuǎn)換操作如下:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
這里將創(chuàng)建一個新的DataStream,通過將原始集合中的每一個String類型轉(zhuǎn)換成Integer類型触徐。
一旦你有了一個包含你最終結(jié)果的DataStream咪鲜,你就可以創(chuàng)建一個sink來將它保存到外部系統(tǒng)了,這里有一些創(chuàng)建sink的簡單例子:
writeAsText(path: String)
print()
一旦你的程序已經(jīng)完成撞鹉,你需要調(diào)用StreamExecutionEnvironment
中的execute()
方法來觸發(fā)程序的執(zhí)行疟丙。根據(jù)ExecutionEnvironment
的類型不同,程序可能在本地觸發(fā)執(zhí)行或者將程序分發(fā)到集群中去執(zhí)行鸟雏。
?
execute()
方法返回JobExecutionResult
結(jié)果享郊,JobExecutionResult
包含了執(zhí)行次數(shù)以及累加器的結(jié)果。
請查看Streaming 指南來獲取關(guān)于流數(shù)據(jù)的source和sink的信息孝鹊,以及關(guān)于DataStream所支持的transformation的更深入的信息炊琉。
?
請查看Batch指南來獲取關(guān)于批數(shù)據(jù)的Source和sink信息,已經(jīng)關(guān)于DataSet所支持的transformation的更深入的信息。
延遲計算(Lazy Evaluation)
所有的Flink程序都是延遲計算的:當(dāng)程序的main方法執(zhí)行的時候苔咪,數(shù)據(jù)的加載及transformation操作都不會直接的執(zhí)行锰悼。相反,所有的操作的創(chuàng)建及執(zhí)行都是添加到程序的執(zhí)行計劃中团赏,所有的操作都是在ExecutionEnvironment(執(zhí)行環(huán)境)調(diào)用execute()
方法觸發(fā)執(zhí)行后才真正的去執(zhí)行箕般。而程序是在本地執(zhí)行還是在集群中執(zhí)行取決于ExecutionEnvironment(執(zhí)行環(huán)境)的類型。
Flink的延遲計算讓我們能夠構(gòu)造復(fù)雜的程序舔清,而Flink則把這個程序當(dāng)做一個完整的計劃單元去執(zhí)行丝里。
指定Key(Specifying Keys)
有些transformation(例如:join、coGroup体谒、keyBy杯聚、groupBy) 需要一個在數(shù)據(jù)集中定義的key,而其他的transformation(例如:Reduce抒痒、GroupReduce械媒、Aggregate、Window) 則允許在調(diào)用它們之前對數(shù)據(jù)進(jìn)行按key分組處理评汰。
DataSet可以按如下方式進(jìn)行分組處理:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
而DataStream則可以按以下方式進(jìn)行分組處理:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的數(shù)據(jù)模型不是基于鍵值對的,所以你無需在物理上將數(shù)據(jù)集打包成鍵值對形式痢虹。Key是虛擬的:在真實數(shù)據(jù)上作為函數(shù)來定義是為了指導(dǎo)分組操作被去。
注意:接下來的討論中,我們將使用DataStream和keyBy來進(jìn)行奖唯,而對于DataSet API惨缆,你只需要替換成DataSet和groupBy即可。
為元組定義key(Define keys for Tuples)
最簡單的情況是根據(jù)元組上的一個或者多個字段對元組進(jìn)行分組丰捷。
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
這個元組根據(jù)第一個字段進(jìn)行分組
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
這里我們根據(jù)元組的第一個字段和第二個字段組成的復(fù)合key來對元組進(jìn)行分組坯墨。
嵌套元組:如果你有一個嵌套元組的DataStream如下:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)的話將會導(dǎo)致系統(tǒng)使用整個Tuple2作為key(由一個Integer和一個Float作為key)。如果你想瀏覽嵌套的Tuple2病往,你需要使用接下來要闡述的字段表達(dá)式鍵捣染。
使用字段表達(dá)式定義key(Define keys use Field Expressions)
你可以使用嵌套字段中的String類型的字段來為group
、sort
停巷、join
或者coGroup
等操作定義key耍攘。字段表達(dá)式使得像Tuple、POJO等這些復(fù)雜類型的字段選擇更加容易畔勤。
在下面的例子中蕾各,我們有一個擁有兩個字段”word”和”count”的POJO類wc,為了根據(jù)字段word來做分組庆揪,我們只需要將字段的名稱傳入keyBy()
方法中即可式曲。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
字段表達(dá)式語法(Field Expression Syntax):
1、通過字段的名稱來選擇POJO字段缸榛,例如:”user”是指POJO類型的”user”字段吝羞。
2兰伤、通過元組的0-偏移位的字段名或者0-偏移位的字段索引就可以選擇元組的字段,例如:”_1”和”5”分別表示一個Scala元組類型的第一個字段和第6個字段脆贵。
3医清、你也可以選擇POJO和Tuple中的嵌套字段,例如:”user.zip”代表著一個保存在”user”這個POJO類中的名為”zip”的POJO字段卖氨。任意嵌套和混合的POJO和Tuple也是支持的会烙,例如:”_2.user.zip”或者”user.4.1.zip”
4、你也可以使用通配符””來選擇所有的類型筒捺,但是這種不適用于非Tuple或者POJO類型柏腻。
字段表達(dá)式例子
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
這里是上面代碼中的一些正確的字段表達(dá)式:
“count”
:WC類中的count字段
“complex”
:遞歸地選擇POJO類ComplexNestedClass
中的所有字段
“complex.word._3”
:選擇嵌套造ComplexNestedClass
中的Tuple3
類型的word的最后一個元素
“complex.hadoopCitizen”
:選擇ComplexNestedClass
中類型為Integer
的HadoopCitizen
字段
使用Key選擇函數(shù)來定義key(Define keys using Key Selector Functions)
另一種定義key的方式是使用”key selector”函數(shù),一個”key selector”函數(shù)把一個元素作為輸入系吭,并產(chǎn)生這個元素的key五嫂。這個key可以是任意類型并來自任意計算。
下面的例子中展示了一個返回一個對象中的字段的key selector函數(shù):
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
指定Transformation函數(shù)(Specifying Transformation Function)
大部分的transformation都需要用戶自定義函數(shù)肯尺,這部分將列出不同的方式來展示這些函數(shù)是如何被展示的沃缘。
Lambda表達(dá)式函數(shù)(Lambda Functions)
正如前面的例子所見,所有的操作都支持lambda表達(dá)式來描述這些操作:
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
富函數(shù)(Rich Functions)
所有作為參數(shù)傳給Lambda函數(shù)的操作都可以作為參數(shù)傳給富函數(shù)则吟,例如:不同于
data.map { x => x.toInt }
你可以寫成這個樣子:
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
并將其傳入map的轉(zhuǎn)換操作中:
data.map(new MyMapFunction())
富函數(shù)也可以以匿名內(nèi)部類的形式定義:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
富函數(shù)提供除了用戶定義函數(shù)(如:map, reduce
等)之外槐臀,還提供了四個函數(shù):open,close氓仲,getRuntimeContext和setRuntimeContext
水慨。這些對于參數(shù)化函數(shù)(參考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#passing-parameters-to-functions),創(chuàng)建和初始化本地狀態(tài),獲取廣播變量(參見: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#broadcast-variables)以及獲取運(yùn)行時信息敬扛,例如:累加器和計數(shù)器(參見: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#accumulators--counters),獲取迭代信息(參見: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/iterations.html)是非常有用的晰洒。
支持的數(shù)據(jù)類型(Supported Data Type)
Flink對DataSet和DataStream中的數(shù)據(jù)元素的類型做了一些限制,原因是為了讓系統(tǒng)能夠察覺到數(shù)據(jù)的類型以便采取更有效的執(zhí)行策略啥箭。
這里有6中不同的數(shù)據(jù)類型:
1谍珊、 Java Tuple和Scala的case class類型
2、 Java的POJO類型
3捉蚤、 原生數(shù)據(jù)類型
4抬驴、 常規(guī)類類型
5、 值類型
6缆巧、 Hadoop的Writable類
7布持、 一些特殊的類型
Tuple和Case Class
Scala的case class類(Scala的Tuple也是一種特殊的case class),是一個復(fù)合類型陕悬,包含了固定數(shù)量的不同類型的字段题暖。Tuple字段用1到偏移位置坐標(biāo)記,例如_1表示第一個字段。而case class則可以根據(jù)字段名稱來獲取:
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
POJO
Java和Scala的類如果滿足下面的要求的話胧卤,F(xiàn)link都會把它們當(dāng)做特殊的POJO來處理:
1唯绍、 class必須是public的
2、 必須有一個public的無參構(gòu)造函數(shù)
3枝誊、 所有的字段要么是public的况芒,要么必須能夠通過getter和setter函數(shù)能夠獲取得到,對于一個名叫foo的字段叶撒,它的getter和setter函數(shù)必須是getFoo()和setFoo()
4绝骚、 字段的類型必須是Flink能夠支持的,目前Flink使用Avro來序列化隨意對象(例如Date)
Flink分析POJO的類型結(jié)構(gòu)祠够,了解POJO的字段压汪,這樣POJO的類型使用起來就比使用泛型方便多了,此外Flink處理POJO的效率也會比處理泛型高。
下面的例子展示了一個有兩個字段的簡單POJO類:
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
原生數(shù)據(jù)類型
Flink支持所有的Java和Scala原生類型古瓤,例如:Integer, String 和 Double等止剖。
常規(guī)類類型
Flink支持大部分的Java/Scala類,限制應(yīng)用于包含不能序列化字段落君,如:指針穿香、I/O流或者其他Native資源的類。所有遵循JavaBean規(guī)則的類都能很好的應(yīng)用于Flink中绎速。
所有不能當(dāng)做POJO處理的類都會被Flink當(dāng)做泛型類來處理扔水,F(xiàn)link把它們多做黑箱處理,并且獲取不到它們的內(nèi)容朝氓。泛型的序列化和反序列化使用的是Kryo序列化框架。
值類型
值類型都是手動描述它們的序列化和放序列化機(jī)制主届。它們通過自定義代碼赵哲,實現(xiàn)帶有read和write方法的org.apache.flinktypes.value
接口來實現(xiàn)序列化和反序列化,而不是使用通用的序列化反序列化框架君丁。當(dāng)通用序列化框架效率很低的時候使用值類型是很合理的枫夺,例如:一個實現(xiàn)稀疏向量元素的數(shù)組,數(shù)組大多數(shù)情況下都是0绘闷,我們可以使用特殊的編碼來表示非零元素橡庞,而通用序列化框架則是簡單的寫所有的數(shù)組元素。
這個org.apache.flinktypes.CopyableValue
接口也支持同樣的克隆邏輯印蔗。
Flink預(yù)定義的值類型與原生數(shù)據(jù)類型是一一對應(yīng)的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue
)扒最。這些值類型作為原生數(shù)據(jù)類型的可變變體,他們的值是可以改變的华嘹,允許程序重用對象從而緩解GC的壓力吧趣。
Hadoop的Writable類
你可以使用實現(xiàn)org.apache.hadoop.Writable
接口的類型,該類型的序列化邏輯在write()
方法中實現(xiàn)而反序列化邏輯則在readFields()
方法中實現(xiàn)。
特殊類型
你可以使用特殊的類型强挫,包括Scala的Either岔霸、Option和Try等,Java API也有自己實現(xiàn)的Either俯渤,跟Scala的Either類似呆细,它表示一個可能有兩種類型的值,Left和Right八匠。Either在錯誤處理和需要輸出兩個不同類型的記錄的操作中是非常有用的絮爷。
類型擦除和類型推理
注意:這部分僅對Java起作用!
Java編譯器在編譯之后會拋出很多的泛型信息臀叙,在Java中稱為擦除略水,也就意味著在執(zhí)行時,一個實例不在知道它的泛型信息劝萤。例如:DataStream<String>
和DataStream<Long>
在JVM中是一樣的渊涝。
Flink會在程序準(zhǔn)備執(zhí)行時(當(dāng)main方法被調(diào)用時),用到類型信息床嫌。Flink 的Java API會試圖去重建這些被丟棄的類型信息跨释,并將它們明確地存儲在數(shù)據(jù)集以及操作中。你可以通過DataStream.getType()
方法來獲取類型厌处,這個方法將返回一個TypeInformation的實例鳖谈,這個實例是Flink內(nèi)部表示類型的方式。
類型的推理有其局限性阔涉,在某些情況下需要程序員的協(xié)助(“cooperation”), 例如缆娃,通過集合創(chuàng)建的數(shù)據(jù)集蹲诀,如ExecutionEnvironment.fromCollection()
塔逃,你需要傳入一個描述類型的參數(shù)。但是同時對于泛型函數(shù)如MapFunction<I, O>
冬念,則需要額外的類型信息椭住。
可以實現(xiàn)ResultTypeQueryable
接口崇渗,通過輸入格式和函數(shù)來告訴API它們確切的返回類型。
函數(shù)調(diào)用的輸入類型通尘┲#可以由之前操作的結(jié)果類型來推斷宅广。
累加器和計數(shù)器
累加器是由一個加法操作和一個在作業(yè)運(yùn)行結(jié)束后可用的累加結(jié)果組成的簡單結(jié)構(gòu)。
最簡單直接的累加器是一個計數(shù)器些举,你可以調(diào)用Accumulator.add(V value)方法來累加它跟狱。在作業(yè)執(zhí)行結(jié)束后,F(xiàn)link會累加所有的部分結(jié)果户魏,并將結(jié)果返回給客戶端兽肤。累加器在debug階段或者你想快速的了解更多你的數(shù)據(jù)時是非常有用的套腹。
Flink目前有以下幾個內(nèi)置的累加器,每一個都實現(xiàn)了Acumulator接口:
IntCounter资铡、LongCounter
和DoubleCounter
:請往下看如何使用counter的結(jié)果电禀。
Histogram
:A histogram implementation for a discrete number of bins
.在內(nèi)部,它僅僅是Integer到Integer的映射笤休,你可以用這個來計算值的分布尖飞,例如 word count程序中每行單詞的分布。
如何使用累加器:
首先店雅,你需要在你需要用到累加器的自定義transformation函數(shù)中創(chuàng)建一個累加器對象(這里是計數(shù)器)
private IntCounter numLines = new IntCounter();
其次政基,你還需要注冊累加器對象,通常是在rich function的open()方法中闹啦。這里你還需要定義累加器的名字:
getRuntimeContext().addAccumulator("num-lines", this.numLines);
現(xiàn)在你可以在任何操作函數(shù)中來使用累加器了沮明,包括在open()和close()方法中。
this.numLines.add(1);
總的結(jié)果將保存在ExecutionEnvironment
的execute
方法返回的JobExecutionResult
對象中窍奋。
myJobExecutionResult.getAccumulatorResult("num-lines")
每一個作業(yè)中的所有累加器共享一個命名空間荐健,因此你可以在同一個作業(yè)的不同操作函數(shù)中使用同一個累加器,F(xiàn)link內(nèi)部會合并所有的累計器以同一個名字返回琳袄。
注意:對于累加器和計數(shù)器江场,當(dāng)前的累加器結(jié)果只有在整個作業(yè)結(jié)束后才可用,如果你想在每次迭代獲取前一次迭代的結(jié)果窖逗,你可以使用Aggregator來計算每次迭代的統(tǒng)計址否,以及基于上次迭代的最終結(jié)果來統(tǒng)計。
自定義累加器
為了實現(xiàn)你自己的累加器碎紊,你需要實現(xiàn)Accumulator接口佑附,如果你覺得你自定義的累加器需要被Flink收錄的話,請創(chuàng)建一個提交請求仗考。
你可以選擇實現(xiàn)Accumulator
或者SimpleAccumulator
Accumulator<V, R>
是最靈活的:它定義了需要進(jìn)行累加的值的類型V以及最后結(jié)果的類型R帮匾,例如:對于一個histogram,v是數(shù)值類型的而R是一個histogram痴鳄。SimpleAccumulator
則是在進(jìn)行累計數(shù)據(jù)類型和返回的數(shù)據(jù)類型一致的情況下使用的,例如計數(shù)器缸夹。