首先需要編程應(yīng)用的四層抽象:
最底下的一層對用戶是不可見的, 通過ProcessFunction集成到DataStream API. 我們的編程對象也都是DataStream API(bounded/unbounded streams) 和?DataSet API(bounded data sets)
FLink的應(yīng)用主要包括stream 和 transformation, 當(dāng)然DataSet也是被當(dāng)成Stream. 概念上講, stream指的是一條不會結(jié)束的數(shù)據(jù)記錄, transformation所指的操作是把一個或者多個stream作為輸入, 生成一個或者多個stream.?
所謂的Stream dataflows也是包括streams和transformation操作. 每個dataflow開始于一到多個source, 結(jié)束于一到多個sink. 這個過程可以類似的用DAG圖來表示, 不過確實(shí)通過一些迭代結(jié)構(gòu)允許生成環(huán), 但這種局部特例也可以忽略.
如下圖是一個典型的dataflow:
文字來描述的典型步驟的話是:
? ? 1 Obtain anexecution environment,
? ? 2 Load/create the initial data,
? ? 3 Specify transformations on this data,
? ? 4 Specify where to put the results of your computations,
? ? 5 Trigger the program execution
預(yù)定義的Connectors:
data sources: files, directories, sockets, 以及 collections和iterators的輸入數(shù)據(jù)
data sinks: files, stdout, stderr, sockets
目前所支持的與第三方對接的Connectors, 與flink打包在一起的包括:
Apache Kafka(source/sink)
Apache Cassandra(sink)
Amazon Kinesis Streams(source/sink)
Elasticsearch(sink)
Hadoop FileSystem(sink)
RabbitMQ(source/sink)
Apache NiFi(source/sink)
Twitter Streaming API(source)
通過Apache Bahir發(fā)布的streaming connectors:
Apache ActiveMQ(source/sink)
Apache Flume(sink)
Redis(sink)
Akka(sink)
Netty(source)
常用的Transformations:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html
1 Map, DataStream --> DataStream????
? ? 一對一操作, 輸入一個元素生成一個元素, 比如每個元素*2
? ? dataStream.map{x => x*2}
2 FlatMap, DataStream --> DataStream
? ? 一對多操作, 輸入一個元素, 生成0到多個元素, 比如字符串分詞, 生成詞數(shù)組????
? ? dataStream.flatMap{ w => w.split(" ")}
3 Filter, DataStream --> DataStream
? ? 通過一個bool function對元素進(jìn)行過濾, 保留為true的元素. 比如過濾為0的元素
? ? datastream.fliter{_ != 0}
4 KeyBy, DataStream --> KeyedStream??
? ? 通過hash partitioning的方法將一個stream變成一組不想交的patitions, 每個patitions包含的元素具有相同的key. 返回類型為keyedStream, 一組partitions
? ? ? dataStream.keyBy("someKey")
? ? ? ? ? ? case class WordWithCount(someKey: String, count: Long)????
? ? ? dataStream.keyBy(0) // Key by the first element of tuple
5 Reduce, KeyedStream --> DataStream
? ? Rolling reduce on a keyed data stream, 標(biāo)準(zhǔn)的reduce, 降維過程,?????
? ? keyedStream.reduce {_+_}
6 Fold, KeyedStream --> DataStream
滾動折疊, 合并當(dāng)前元素和上一個被折疊的值, 輸入值和返回值可以不一樣, 還沒有場景用到, 原文: A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
????valresult:DataStream[String] = keyedStream.fold("start")((str,i)=>{str+"-"+i})
7 Aggregations, KeyedStream --> DataStream?
滾動聚合, 很簡單, 一個需要注意的, min返回最小值, minBy返回?fù)碛凶钚≈档脑? 可以是多個
????keyedStream.sum(0), keyedStream.sum("key")
????keyedStream.min(0), keyedStream.min("key")
????keyedStream.max(0), keyedStream.max("key")
????keyedStream.minBy(0), keyedStream.minBy("key")
????keyedStream.maxBy(0), keyedStream.maxBy("key")
8 Window, KeyedStream --> AllWindowedStream
窗口可以被定義在已經(jīng)被分區(qū)的 KeyedStreams 上。窗口會對數(shù)據(jù)的每一個 key 根據(jù)一些特征(例如,在最近 5 秒中內(nèi)到達(dá)的數(shù)據(jù))進(jìn)行分組。查閱窗口了解關(guān)于窗口的完整描述赐俗。
? ??dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));// Last 5 seconds of data
9 WindowAll, DataStream --> AllWindowedStream
窗口可以被定義在 DataStream 上十绑。窗口會對所有數(shù)據(jù)流事件根據(jù)一些特征(例如糊闽,在最近 5 秒中內(nèi)到達(dá)的數(shù)據(jù))進(jìn)行分組娜谊。查閱窗口了解關(guān)于窗口的完整描述。
警告:這在許多案例中這是一種非并行的轉(zhuǎn)換诅蝶。所有的記錄都會被聚集到一個執(zhí)行 WindowAll 操作的 task 中,這是非常影響性能的募壕。????????
? ??dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));// Last 5 seconds of data
10 Window Apply: WindowedStream --> DataStream, AllWindowedStream --> DataStream
應(yīng)用一個一般的函數(shù)到窗口上调炬,窗口中的數(shù)據(jù)會作為一個整體被計(jì)算。下面的函數(shù)手工地計(jì)算了一個窗口中的元素總和舱馅。
注意:如果你正在使用一個 WindowAll 的轉(zhuǎn)換缰泡,你需要用 AllWindowFunction 來替換。
windowedStream.apply{WindowFunction}
// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply{AllWindowFunction}
11 Window Reduce, WindowedStream --> DataStream
應(yīng)用一個reduce函數(shù)到窗口上, 返回reduce的值
12 Window Fold, WindowedStream --> DataStream
應(yīng)用一個 fold 函數(shù)到窗口上代嗤,然后返回折疊后的值棘钞。 在窗口上將序列 (1,2,3,4,5) 轉(zhuǎn)換成 "start-1", "start-1-2", "start-1-2-3", ... 的一個 fold 函數(shù)長這個樣子:
valresult:DataStream[String]=windowedStream.fold("start",(str,i)=>{str+"-"+i})
13 Aggregations on windows, WindowedStream -> DataStream
聚合一個窗口中的內(nèi)容, min 與 minBy 的區(qū)別是 min 返回了最小值,而 minBy 返回了在這個字段上是最小值的所有元素(max 和 maxBy 也是同樣的)
14 Union, DataStream* --> DataStream
Union 兩個或多個數(shù)據(jù)流干毅,生成一個新的包含了來自所有流的所有數(shù)據(jù)的數(shù)據(jù)流宜猜。注意:如果你將一個數(shù)據(jù)流與其自身進(jìn)行了合并,在結(jié)果流中對于每個元素你都會拿到兩份硝逢。
dataStream.union(otherStream1,otherStream2,...)
15 Window Join, DataStream , DataStream --> DataStream
grades.join(salaries).
where(_.name).equalTo(_.name).
window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize))).
apply { (g,s) =>Person(g.name,g.grade,s.salary) }
16 Window CoGroup:? DataStream, DataStream --> DataStream
在一個給定的 key 和窗口上 co-group 兩個數(shù)據(jù)流dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(newCoGroupFunction(){...});
17 Connect: DataStream, DataStream --> ConnectedStreams
"連接"兩個數(shù)據(jù)流并保持原先的類型, Connect 可以讓兩條流之間共享狀態(tài).? ?
someStream:DataStream[Int]=...
otherStream:DataStream[String]=...
valconnectedStreams=someStream.connect(otherStream)
18 CoMap, CoFlatMap: ConnectedStreams --> DataStream
在一個 ConnectedStreams 上做類似 map 和 flatMap 的操作
connectedStreams.map(
(_:Int)=>true
,(_:String)=>false)
connectedStreams.flatMap((_:Int)=>true, (_:String)=>false)
19 Split & Select: DataStream --> SplitStream & SplitStream --> DataStream
根據(jù)規(guī)則把一個stream切分成多個Stream, 并且選擇.
針對基于元組的數(shù)據(jù)流, 可以只選取部分其中部分的字段.
val in:DataStream[(Int,Double,String)] = // [...]
val out = in.project(2,0)