flink基礎(chǔ)——簡單原理介紹

image.png

總結(jié)來看扯夭,流處理是實(shí)時(shí)的鳍贾,數(shù)據(jù)過來,接收后放入緩存勉抓,處理節(jié)點(diǎn)會立刻從緩存中拉取數(shù)據(jù)進(jìn)行處理贾漏。批處理則是數(shù)據(jù)過來,序列化到緩存藕筋,并不會立即處理纵散。flink通過引入超時(shí)值同時(shí)支持兩種方式,當(dāng)超時(shí)值為0時(shí)表示流處理隐圾。

flink編程基本步驟

基本步驟
1 運(yùn)行環(huán)境

我們首先要獲得已經(jīng)存在的運(yùn)行環(huán)境或者創(chuàng)建它伍掀。有3種方法得到運(yùn)行環(huán)境:

(1)通過getExecutionEnvironment()獲得;這將根據(jù)上下文得到運(yùn)行環(huán)境暇藏,假如local模式蜜笤,則它會創(chuàng)建一個(gè)local的運(yùn)行環(huán)境;假如是集群模式盐碱,則會創(chuàng)建一個(gè)分布式的運(yùn)行環(huán)境把兔;
(2)通過createLocalEnvironment() 創(chuàng)建一個(gè)本地的運(yùn)行環(huán)境;
(3)通過createRemoteEnvironment (String host, int port, String, and .jar files)創(chuàng)建一個(gè)遠(yuǎn)程的運(yùn)行環(huán)境瓮顽。

2 數(shù)據(jù)源

Flink支持許多預(yù)定義的數(shù)據(jù)源县好,同時(shí)也支持自定義數(shù)據(jù)源。

2.1 基于socket

DataStream API支持從socket讀取數(shù)據(jù)暖混,有如下3個(gè)方法:

socketTextStream(hostName, port);
socketTextStream(hostName,port,delimiter)
socketTextStream(hostName,port,delimiter, maxRetry)
2.2 基于文件

你可以使用readTextFile(String path)來消費(fèi)文件中的數(shù)據(jù)作為流數(shù)據(jù)的來源缕贡,默認(rèn)情況下的格式是TextInputFormat。當(dāng)然你也可以通過readFile(FileInputFormat inputFormat, String path)來指定FileInputFormat的格式。

Flink同樣支持讀取文件流:

readFileStream(String filePath, long intervalMillis,
FileMonitoringFunction.WatchType watchType)

readFile(fileInputFormat, path, watchType, interval, pathFilter,
typeInfo)晾咪。

3 Transformation

Transformation允許將數(shù)據(jù)從一種形式轉(zhuǎn)換為另一種形式收擦,輸入可以是1個(gè)源也可以是多個(gè),輸出則可以是0個(gè)谍倦、1個(gè)或者多個(gè)塞赂。下面我們一一介紹這些Transformations。

3.1 Map

輸入1個(gè)元素剂跟,輸出一個(gè)元素减途,Java API如下:

inputStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 5 * value;
}
});
3.2 FlatMap

輸入1個(gè)元素,輸出0個(gè)曹洽、1個(gè)或多個(gè)元素鳍置,Java API如下:

inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
3.3 Filter

條件過濾時(shí)使用,當(dāng)結(jié)果為true時(shí)送淆,輸出記錄税产;

inputStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 1;
}
});
3.4 keyBy

邏輯上按照key分組,內(nèi)部使用hash函數(shù)進(jìn)行分組偷崩,返回keyedDataStream:

inputStream.keyBy("someKey");
3.5 Reduce

keyedStream流上辟拷,將上一次reduce的結(jié)果和本次的進(jìn)行操作,例如sum reduce的例子:

keyedInputStream. reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
3.6 Fold

在keyedStream流上的記錄進(jìn)行連接操作阐斜,例如:

keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer,
String>() {
@Override
public String fold(String current, Integer value) {
return current + "=" + value;
}
});

假如是一個(gè)(1,2,3,4,5)的流衫冻,那么結(jié)果將是:Start=1=2=3=4=5

3.7 Aggregation

在keyedStream上應(yīng)用類似min、max等聚合操作:

keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")

4. 深入

http://www.reibang.com/p/f9d447a3c48f
http://vinoyang.com/2016/06/22/flink-data-stream-partitioner/

4. Flink DataStream API Programming Guide

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html

5. 事件時(shí)間

參考:https://blog.csdn.net/lmalds/article/details/52704170

Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. This watermarking mechanism is described in a later section, below.
In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be.
Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on late events for more information.)
Note that sometimes when event time programs are processing live data in real-time, they will use some processing time operations in order to guarantee that they are progressing in a timely fashion.

event time是指數(shù)據(jù)產(chǎn)生時(shí)的時(shí)間谒出。
watermark是一種衡量Event Time進(jìn)展的機(jī)制隅俘,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。通丑栽基于Event Time的數(shù)據(jù)为居,自身都包含一個(gè)timestamp,例如1472693399700(2016-09-01 09:29:59.700)杀狡,而這條數(shù)據(jù)的watermark時(shí)間則可能是:

watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)

這條數(shù)據(jù)的watermark時(shí)間是什么含義呢蒙畴?即:timestamp小于1472693396700(2016-09-01 09:29:56.700)的數(shù)據(jù),都已經(jīng)到達(dá)了呜象。
watermark是用于處理亂序事件的膳凝,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)恭陡。
我們知道鸠项,流處理從事件產(chǎn)生,到流經(jīng)source子姜,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下哥捕,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的牧抽,但是也不排除由于網(wǎng)絡(luò)、背壓等原因遥赚,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)扬舒。
但是對于late element,我們又不能無限期的等下去凫佛,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后讲坎,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制愧薛,就是watermark晨炕。
通常,在接收到source的數(shù)據(jù)后毫炉,應(yīng)該立刻生成watermark瓮栗;但是,也可以在source后瞄勾,應(yīng)用簡單的map或者filter操作费奸,然后再生成watermark。
生成watermark的方式主要有2大類:

(1):With Periodic Watermarks
(2):With Punctuated Watermarks

第一種可以定義一個(gè)最大允許亂序的時(shí)間进陡,這種情況應(yīng)用較多愿阐。

編程示例

1)To work with event time, streaming programs need to set the time characteristic accordingly.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

6. flink 并行相關(guān)的核心概念

參考 : http://vinoyang.com/2016/05/02/flink-concepts/
程序基本流程

image.png

程序在Flink內(nèi)部的執(zhí)行具有并行、分布式的特性趾疚。stream被分割成stream partition缨历,operator被分割成operator subtask,這些operator subtasks在不同的線程盗蟆、不同的物理機(jī)或不同的容器中彼此互不依賴得執(zhí)行戈二。

一個(gè)特定operator的subtask的個(gè)數(shù)被稱之為其parallelism(并行度)。一個(gè)stream的并行度總是等同于其producing operator的并行度喳资。一個(gè)程序中觉吭,不同的operator可能具有不同的并行度。


image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末仆邓,一起剝皮案震驚了整個(gè)濱河市鲜滩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌节值,老刑警劉巖徙硅,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異搞疗,居然都是意外死亡嗓蘑,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來桩皿,“玉大人豌汇,你說我怎么就攤上這事⌒垢簦” “怎么了拒贱?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長佛嬉。 經(jīng)常有香客問我逻澳,道長,這世上最難降的妖魔是什么暖呕? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任斜做,我火速辦了婚禮,結(jié)果婚禮上缰揪,老公的妹妹穿的比我還像新娘陨享。我一直安慰自己,他們只是感情好钝腺,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布抛姑。 她就那樣靜靜地躺著,像睡著了一般艳狐。 火紅的嫁衣襯著肌膚如雪定硝。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天毫目,我揣著相機(jī)與錄音蔬啡,去河邊找鬼。 笑死镀虐,一個(gè)胖子當(dāng)著我的面吹牛箱蟆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播刮便,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼空猜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了恨旱?” 一聲冷哼從身側(cè)響起辈毯,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎搜贤,沒想到半個(gè)月后谆沃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仪芒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年唁影,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了耕陷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡据沈,死狀恐怖啃炸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情卓舵,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布膀钠,位于F島的核電站掏湾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏肿嘲。R本人自食惡果不足惜融击,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雳窟。 院中可真熱鬧尊浪,春花似錦、人聲如沸封救。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽誉结。三九已至鹅士,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間惩坑,已是汗流浹背掉盅。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留以舒,地道東北人趾痘。 一個(gè)月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像蔓钟,于是被迫代替她去往敵國和親永票。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,322評論 0 10
  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,451評論 0 13
  • 寫作肚吏,高大上、使人敬仰狭魂。通過文字罚攀,傳播思想或者故事或者心靈契約的宣導(dǎo)党觅。把靈魂和文字雜糅,神圣的產(chǎn)物斋泄。 在有了網(wǎng)絡(luò)杯瞻,...
    走在雨的縫中閱讀 215評論 5 3
  • 臨時(shí)變量 首先看一段代碼: 這個(gè)時(shí)候可以想一想會輸出什么? mi的值是什么炫掐?運(yùn)行結(jié)果如下: 程序意圖:在Test(...
    nethanhan閱讀 991評論 0 0
  • 平平淡淡的一天過去了魁莉,又可以躺在床上享受這一天中的美好時(shí)光了,懷里摟著熟睡的二寶募胃,想想大寶小的時(shí)候那真叫一個(gè)聽話...
    博碩媽閱讀 148評論 0 0