Data Sources
源是程序讀取輸入數(shù)據(jù)的位置≈辉酰可以使用 StreamExecutionEnvironment.addSource(sourceFunction)
將源添加到程序。Flink 有許多預(yù)先實現(xiàn)的源函數(shù)香府,也可以通過實現(xiàn) SourceFunction
方法自定義非并行源 狈茉,或通過實現(xiàn) ParallelSourceFunction
或擴展 RichParallelSourceFunction
自定義并行源。
有幾個預(yù)定義的流數(shù)據(jù)源可從 StreamExecutionEnvironment
訪問:
基于文件:
-
readTextFile(path)
逐行讀取文本文件(文件符合TextInputFormat
格式)劣欢,并作為字符串返回每一行。
readFile(fileInputFormat, path)
按指定的文件輸入格式(fileInputFormat)讀取指定路徑的文件裁良。readFile(fileInputFormat, path, watchType, interval, pathFilter)
前兩個方法的內(nèi)部調(diào)用方法凿将。根據(jù)給定文件格式(fileInputFormat)讀取指定路徑的文件。根據(jù) watchType价脾,定期監(jiān)聽路徑下的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY
)牧抵,或者處理當前在路徑中的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE
)。使用 pathFilter侨把,可以進一步排除正在處理的文件犀变。
基于Socket:
-
socketTextStream
從 Socket 讀取,元素可以用分隔符分隔秋柄。
基于集合:
fromCollection(Seq)
用 Java.util.Collection 對象創(chuàng)建數(shù)據(jù)流获枝。集合中的所有元素必須屬于同一類型。fromCollection(Iterator)
用迭代器創(chuàng)建數(shù)據(jù)流骇笔。指定迭代器返回的元素的數(shù)據(jù)類型省店。fromElements(elements: _*)
從給定的對象序列創(chuàng)建數(shù)據(jù)流机隙。所有對象必須屬于同一類型。fromParallelCollection(SplittableIterator)
并行地從迭代器創(chuàng)建數(shù)據(jù)流萨西。指定迭代器返回的元素的數(shù)據(jù)類型有鹿。generateSequence(from, to)
并行生成給定間隔的數(shù)字序列。
自定義:
-
addSource
附加新的源函數(shù)谎脯。例如葱跋,要從 Apache Kafka 中讀取,可以使用addSource(new FlinkKafkaConsumer08<>(...))
源梭。請詳細查看 連接器娱俺。
DataStream Transformation
轉(zhuǎn)換函數(shù)
Map
DataStream -> DataStream,一個數(shù)據(jù)元生成一個新的數(shù)據(jù)元废麻。
將輸入流的元素翻倍:
dataStream.map { x => x * 2 }
FlatMap
DataStream -> DataStream荠卷,一個數(shù)據(jù)元生成多個數(shù)據(jù)元(可以為0)。將句子分割為單詞:
dataStream.flatMap { str => str.split(" ") }
Filter
DataStream -> DataStream烛愧,每個數(shù)據(jù)元執(zhí)行布爾函數(shù)油宜,只保存函數(shù)返回 true 的數(shù)據(jù)元。過濾掉零值的過濾器:
dataStream.filter { _ != 0 }
KeyBy
DataStream -> KeyedStream怜姿,將流劃分為不相交的分區(qū)慎冤。具有相同 Keys 的所有記錄在同一分區(qū)。指定 key 的取值:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream -> DataStream沧卢,KeyedStream 元素滾動執(zhí)行 Reduce蚁堤。將當前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。創(chuàng)建 key 的值求和:
keyedStream.reduce { _ + _ }
Fold
KeyedStream -> DataStream但狭,具有初始值的 Reduce披诗。將當前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。當應(yīng)用于序列(1,2,3,4,5)時立磁,發(fā)出序列"start-1"呈队,"start-1-2","start-1-2-3", ...:
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations
KeyedStream -> DataStream息罗,應(yīng)用于 KeyedStream 上的滾動聚合掂咒。min
和 minBy
的區(qū)別是是 min
返回最小值才沧,minBy
具有最小值的數(shù)據(jù)元(max
和 maxBy
同理):
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream -> WindowedStream迈喉,Windows 可以在已經(jīng)分區(qū)的 KeyedStream 上定義。Windows 根據(jù)某些特征(例如温圆,在最近5秒內(nèi)到達的數(shù)據(jù))對每個Keys中的數(shù)據(jù)進行分組挨摸。更多說明參考 Windows 或 譯版。
// Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAll
DataStream -> AllWindowedStream岁歉,Windows 也可以在 DataStream 上定義得运。在許多情況下膝蜈,這是非并行轉(zhuǎn)換。所有記錄將收集在 windowAll 算子的一個任務(wù)中熔掺。
// Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window Apply
WindowedStream -> DataStream 或 AllWindowedStream -> DataStream饱搏,將函數(shù)應(yīng)用于整個窗口。一個對窗口數(shù)據(jù)求和:
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream -> DataStream置逻,Reduce 函數(shù)應(yīng)用于窗口并返回結(jié)果值推沸。
windowedStream.reduce { _ + _ }
Window Fold
WindowedStream -> DataStream,F(xiàn)old 函數(shù)應(yīng)用于窗口并返回結(jié)果值券坞。當函數(shù)應(yīng)用于窗口的序列(1,2,3,4,5)時鬓催,發(fā)送出 "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream -> DataStream,聚合窗口的內(nèi)容:
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* -> DataStream恨锚,兩個或多個數(shù)據(jù)流的合并宇驾,創(chuàng)建包含來自所有流的所有數(shù)據(jù)元的新流。如果將數(shù)據(jù)流與自身聯(lián)合猴伶,則會在結(jié)果流中獲取兩次數(shù)據(jù)元课舍。
dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream -> DataStream,Join 連接兩個流他挎,指定 Key 和窗口布卡。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream -> DataStream,CoGroup 連接兩個流雇盖,指定 Key 和窗口忿等。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
CoGroup 與 Join 的區(qū)別:
CoGroup 會輸出未匹配的數(shù)據(jù),Join 只輸出匹配的數(shù)據(jù)
Connect
DataStream,DataStream -> ConnectedStreams崔挖,連接兩個有各自類型的數(shù)據(jù)流贸街。允許兩個流之間的狀態(tài)共享。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
可用于數(shù)據(jù)流關(guān)聯(lián)配置流
CoMap, CoFlatMap
ConnectedStreams -> DataStream狸相,作用域連接數(shù)據(jù)流(connected data stream)上的 map
和 flatMap
:
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Split
DataStream -> SplitStream薛匪,將數(shù)據(jù)流拆分為兩個或更多個流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select
SplitStream -> DataStream脓鹃,從 SpliteStream 中選擇一個流或多個流逸尖。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate
DataStream -> IterativeStream -> DataStream,將一個算子的輸出重定向到某個先前的算子瘸右,在流中創(chuàng)建 feedback
循環(huán)娇跟。這對于定義不斷更新模型的算法特別有用。以下代碼以流開頭并連續(xù)應(yīng)用迭代體太颤。大于0的數(shù)據(jù)元將被發(fā)送回 feedback
苞俘,其余數(shù)據(jù)元將向下游轉(zhuǎn)發(fā)。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps
DataStream -> DataStream龄章,從記錄中提取時間戳吃谣,以便使用事件時間窗口乞封。
stream.assignTimestamps (new TimeStampExtractor() {...});
Project
DataStream -> DataStream,作用于元組的轉(zhuǎn)換岗憋,從元組中選擇字段的子集肃晚。
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
分區(qū)函數(shù)
Custom partitioning
DataStream -> DataStream,使用自定義的分區(qū)函數(shù)(Partitioner)為每個數(shù)據(jù)元選擇目標分區(qū)和所在任務(wù)仔戈。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
Random partitioning
DataStream -> DataStream陷揪,隨機均勻分布分配數(shù)據(jù)元。
dataStream.shuffle();
Rebalancing (Round-robin partitioning)
DataStream -> DataStream杂穷,輪詢?yōu)閿?shù)據(jù)元分區(qū)悍缠,每個分區(qū)創(chuàng)建相等的負載。在存在數(shù)據(jù)偏斜時用于性能優(yōu)化耐量。
dataStream.rebalance()
Rescaling
DataStream -> DataStream飞蚓,根據(jù)上下游的分區(qū)數(shù)量,輪詢?yōu)閿?shù)據(jù)元分區(qū)廊蜒。
dataStream.rescale();
建議使用
rescale
替代rebalance
趴拧。
例如,上游是5個并發(fā)山叮,下游是10個并發(fā)著榴。當使用 Rebalance 時,上游每個并發(fā)會輪詢發(fā)給下游10個并發(fā)屁倔。當使用 Rescale 時脑又,上游每個并發(fā)只需輪詢發(fā)給下游2個并發(fā),能提高網(wǎng)絡(luò)效率锐借。
當上游的數(shù)據(jù)比較均勻時问麸,且上下游的并發(fā)數(shù)成比例時,可以使用 Rescale 替換 Rebalance钞翔。參數(shù):enable.rescale.shuffling=true严卖,默認關(guān)閉。
Broadcasting
DataStream -> DataStream布轿,向每個分區(qū)廣播數(shù)據(jù)元哮笆。
dataStream.broadcast()
Task chaining and resource groups
Chaining 兩個后續(xù)轉(zhuǎn)換意味著將它們定位在同一個線程中以獲得更好的性能。Flink 默認會鏈接一些算子(例如汰扭,連續(xù)兩個的 map
轉(zhuǎn)換)稠肘。API可以對鏈接進行細粒度控制:
使用 StreamExecutionEnvironment.disableOperatorChaining()
可以禁用整個工作的算子鏈接。對于更細粒度的控制东且,可以使用以下函數(shù)启具。(這些函數(shù)只能在 DataStream 轉(zhuǎn)換后立即使用本讥。例如珊泳,可以使用 someStream.map(...).startNewChain()
鲁冯,但不能使用 someStream.startNewChain()
)
Resource group 是 Flink 中的一個 slot。如果需要色查,可以在單獨的 slot 中手動隔離算子薯演。
Start new chain
從這個算子開始,開始一個新的鏈秧了。兩個 map
將被鏈接跨扮,filter
將不會在鏈接當中。
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining
不要鏈接 map
算子
someStream.map(...).disableChaining()
Set slot sharing group
設(shè)置算子操作的 slot sharing验毡。將把具有相同 slot sharing 的算子操作放入同一個 slot衡创,同時保證其他 slot 中沒有 slot sharing 的算子操作【ǎ可用于隔離 slot璃氢。默認 slot sharing group 的名稱為"default",可以通過調(diào)用 slotSharingGroup("groupName")
將算子操作顯式放入此組中狮辽。
someStream.filter(...).slotSharingGroup("name")
Data Sinks
Data Sink 消費 DataStream 并轉(zhuǎn)發(fā)到文件一也,套接字,外部系統(tǒng)或打印到頁面喉脖。Flink 帶有各種內(nèi)置輸出格式椰苟,封裝在 DataStreams 上的算子操作后面:
writeAsText() / TextOutputFormat
, 按字符串順序?qū)懭胛募Mㄟ^調(diào)用每個元素的toString()
方法獲得字符串树叽。writeAsCsv(...) / CsvOutputFormat
舆蝴,將元組寫為逗號分隔的形式寫入文件。行和字段分隔符是可配置的题诵。每個字段的值來自對象的toString()
方法须误。print() / printToErr()
,在標準輸出/標準錯誤流上打印每個元素的toString()
值仇轻【┝。可以定義輸出前綴,這有助于區(qū)分不同的打印調(diào)用篷店。如果并行度大于1祭椰,輸出也包含生成輸出的任務(wù)的標識符。writeUsingOutputFormat() / FileOutputFormat
疲陕,自定義文件輸出的方法和基類方淤。支持自定義對象到字節(jié)的轉(zhuǎn)換。writeToSocket
蹄殃,將元素寫入 Socket携茂,使用SerializationSchema
進行序列化。addSink
诅岩,調(diào)用自定義接收器函數(shù)讳苦。請詳細查看 連接器带膜。
DataStream 的 write*()
方法主要用于調(diào)試目的。他們沒有參與 Flink checkpoint鸳谜,這意味著這些函數(shù)通常具有至少一次的語義膝藕。刷新到目標系統(tǒng)的數(shù)據(jù)取決于 OutputFormat 的實現(xiàn),并非所有發(fā)送到 OutputFormat 的數(shù)據(jù)都會立即顯示在目標系統(tǒng)中咐扭。此外芭挽,在失敗的情況下,這些記錄可能會丟失蝗肪。
要將流可靠袜爪、準確地傳送到文件系統(tǒng),請使用 flink-connector-filesystem薛闪。通過 .addSink(...)
方法的自定義實現(xiàn)饿敲,可以實現(xiàn)在 checkpoint 中精確一次的語義。
Iterations
迭代流程序?qū)⒑瘮?shù)嵌入到 IterativeStream逛绵。由于 DataStream 程序可能永遠不會完成怀各,因此沒有最大迭代次數(shù)。相反术浪,需要指定流的哪個部分反饋到迭代瓢对,哪個部分使用 split 或 filter 轉(zhuǎn)發(fā)到下游。
下面是一個示例迭代胰苏,其中主體(重復(fù)的計算部分)是一個簡單的 map
轉(zhuǎn)換硕蛹,反饋的元素由使用過濾器向下游轉(zhuǎn)發(fā)的元素區(qū)分。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})
例如硕并,從一系列整數(shù)中連續(xù)減去1直到它們達到零的程序:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html