總結(jié)來看扯夭,流處理是實(shí)時(shí)的鳍贾,數(shù)據(jù)過來,接收后放入緩存勉抓,處理節(jié)點(diǎn)會立刻從緩存中拉取數(shù)據(jù)進(jìn)行處理贾漏。批處理則是數(shù)據(jù)過來,序列化到緩存藕筋,并不會立即處理纵散。flink通過引入超時(shí)值同時(shí)支持兩種方式,當(dāng)超時(shí)值為0時(shí)表示流處理隐圾。
flink編程基本步驟
1 運(yùn)行環(huán)境
我們首先要獲得已經(jīng)存在的運(yùn)行環(huán)境或者創(chuàng)建它伍掀。有3種方法得到運(yùn)行環(huán)境:
(1)通過getExecutionEnvironment()獲得;這將根據(jù)上下文得到運(yùn)行環(huán)境暇藏,假如local模式蜜笤,則它會創(chuàng)建一個(gè)local的運(yùn)行環(huán)境;假如是集群模式盐碱,則會創(chuàng)建一個(gè)分布式的運(yùn)行環(huán)境把兔;
(2)通過createLocalEnvironment() 創(chuàng)建一個(gè)本地的運(yùn)行環(huán)境;
(3)通過createRemoteEnvironment (String host, int port, String, and .jar files)創(chuàng)建一個(gè)遠(yuǎn)程的運(yùn)行環(huán)境瓮顽。
2 數(shù)據(jù)源
Flink支持許多預(yù)定義的數(shù)據(jù)源县好,同時(shí)也支持自定義數(shù)據(jù)源。
2.1 基于socket
DataStream API支持從socket讀取數(shù)據(jù)暖混,有如下3個(gè)方法:
socketTextStream(hostName, port);
socketTextStream(hostName,port,delimiter)
socketTextStream(hostName,port,delimiter, maxRetry)
2.2 基于文件
你可以使用readTextFile(String path)來消費(fèi)文件中的數(shù)據(jù)作為流數(shù)據(jù)的來源缕贡,默認(rèn)情況下的格式是TextInputFormat。當(dāng)然你也可以通過readFile(FileInputFormat inputFormat, String path)來指定FileInputFormat的格式。
Flink同樣支持讀取文件流:
readFileStream(String filePath, long intervalMillis,
FileMonitoringFunction.WatchType watchType)
readFile(fileInputFormat, path, watchType, interval, pathFilter,
typeInfo)晾咪。
3 Transformation
Transformation允許將數(shù)據(jù)從一種形式轉(zhuǎn)換為另一種形式收擦,輸入可以是1個(gè)源也可以是多個(gè),輸出則可以是0個(gè)谍倦、1個(gè)或者多個(gè)塞赂。下面我們一一介紹這些Transformations。
3.1 Map
輸入1個(gè)元素剂跟,輸出一個(gè)元素减途,Java API如下:
inputStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 5 * value;
}
});
3.2 FlatMap
輸入1個(gè)元素,輸出0個(gè)曹洽、1個(gè)或多個(gè)元素鳍置,Java API如下:
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
3.3 Filter
條件過濾時(shí)使用,當(dāng)結(jié)果為true時(shí)送淆,輸出記錄税产;
inputStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 1;
}
});
3.4 keyBy
邏輯上按照key分組,內(nèi)部使用hash函數(shù)進(jìn)行分組偷崩,返回keyedDataStream:
inputStream.keyBy("someKey");
3.5 Reduce
keyedStream流上辟拷,將上一次reduce的結(jié)果和本次的進(jìn)行操作,例如sum reduce的例子:
keyedInputStream. reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
3.6 Fold
在keyedStream流上的記錄進(jìn)行連接操作阐斜,例如:
keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer,
String>() {
@Override
public String fold(String current, Integer value) {
return current + "=" + value;
}
});
假如是一個(gè)(1,2,3,4,5)的流衫冻,那么結(jié)果將是:Start=1=2=3=4=5
3.7 Aggregation
在keyedStream上應(yīng)用類似min、max等聚合操作:
keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")
4. 深入
http://www.reibang.com/p/f9d447a3c48f
http://vinoyang.com/2016/06/22/flink-data-stream-partitioner/
4. Flink DataStream API Programming Guide
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html
5. 事件時(shí)間
參考:https://blog.csdn.net/lmalds/article/details/52704170
Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. This watermarking mechanism is described in a later section, below.
In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be.
Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on late events for more information.)
Note that sometimes when event time programs are processing live data in real-time, they will use some processing time operations in order to guarantee that they are progressing in a timely fashion.
event time是指數(shù)據(jù)產(chǎn)生時(shí)的時(shí)間谒出。
watermark是一種衡量Event Time進(jìn)展的機(jī)制隅俘,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。通丑栽基于Event Time的數(shù)據(jù)为居,自身都包含一個(gè)timestamp,例如1472693399700(2016-09-01 09:29:59.700)杀狡,而這條數(shù)據(jù)的watermark時(shí)間則可能是:
watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)
這條數(shù)據(jù)的watermark時(shí)間是什么含義呢蒙畴?即:timestamp小于1472693396700(2016-09-01 09:29:56.700)的數(shù)據(jù),都已經(jīng)到達(dá)了呜象。
watermark是用于處理亂序事件的膳凝,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)恭陡。
我們知道鸠项,流處理從事件產(chǎn)生,到流經(jīng)source子姜,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下哥捕,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的牧抽,但是也不排除由于網(wǎng)絡(luò)、背壓等原因遥赚,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)扬舒。
但是對于late element,我們又不能無限期的等下去凫佛,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后讲坎,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制愧薛,就是watermark晨炕。
通常,在接收到source的數(shù)據(jù)后毫炉,應(yīng)該立刻生成watermark瓮栗;但是,也可以在source后瞄勾,應(yīng)用簡單的map或者filter操作费奸,然后再生成watermark。
生成watermark的方式主要有2大類:
(1):With Periodic Watermarks
(2):With Punctuated Watermarks
第一種可以定義一個(gè)最大允許亂序的時(shí)間进陡,這種情況應(yīng)用較多愿阐。
編程示例
1)To work with event time, streaming programs need to set the time characteristic accordingly.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
6. flink 并行相關(guān)的核心概念
參考 : http://vinoyang.com/2016/05/02/flink-concepts/
程序基本流程
程序在Flink內(nèi)部的執(zhí)行具有并行、分布式的特性趾疚。stream被分割成stream partition缨历,operator被分割成operator subtask,這些operator subtasks在不同的線程盗蟆、不同的物理機(jī)或不同的容器中彼此互不依賴得執(zhí)行戈二。
一個(gè)特定operator的subtask的個(gè)數(shù)被稱之為其parallelism(并行度)。一個(gè)stream的并行度總是等同于其producing operator的并行度喳资。一個(gè)程序中觉吭,不同的operator可能具有不同的并行度。