Chapter 2 Data Processing Using the DataStream API

Real-time analytics is currently an important issue. Many different domains need to process data in real time. So far there have been multiple technologies trying to provide this capability. Technologies such as Storm and Spark have been on the market for a long time now. Applications derived from the Internet of Things (IoT) need data to be stored儡率,processed, and analyzed in real or near real time. In order to cater for such needs, Flink provides a streaming data processing API called DataStream API.
(當(dāng)前Real-time分析是一個(gè)非常重要的問(wèn)題盈滴。很多領(lǐng)域都需要實(shí)時(shí)地處理數(shù)據(jù)锰扶。截止目前,有很多技術(shù)來(lái)提供這種數(shù)據(jù)的實(shí)時(shí)處理能力酵幕。象Storm Spark這種技術(shù)很早就已經(jīng)出現(xiàn)了。源于互聯(lián)網(wǎng)的應(yīng)用程序需要實(shí)時(shí)或準(zhǔn)實(shí)時(shí)地存儲(chǔ)翘贮,處理及分析它們的數(shù)據(jù)迂曲。為滿足這些需求,F(xiàn)link提供了流數(shù)據(jù)處理API 叫DataStream API)

In this chapter, we are going to look at the details relating to DataStream API, covering the following topics:
(在這一節(jié)佳遂,我們著眼于DataStream API相關(guān)的一些細(xì)節(jié)营袜,覆蓋以下幾個(gè)topic)

  • Execution environment
  • Data sources
  • Transformations
  • Data sinks
  • Connectors
  • Use case -sensor data analytics

Any Flink program works on a certain defined anatomy as follows:
Flink應(yīng)用程序基于確定的結(jié)構(gòu)工作。如下圖所示:


image.png

We will be looking at each step and how we can use DataStream API with this anatomy.
我們會(huì)研究這里的每一步丑罪,以及我們?cè)趺词褂?code>DataStream API荚板。

Execution environment

In order to start writing a Flink program, we first need to get an existing execution environment or create one.
Flink應(yīng)用程序凤壁,首先,我們需要獲得一個(gè)execution environment,或者創(chuàng)建一個(gè)execution environment跪另。
Depending upon what you are trying to do, Flink supports:

  • Getting an already existing Flink environment
  • Creating a local environment.
  • Creating a remote environment拧抖。
    根據(jù)你的想法(獲取還是新建?)Flink 支持:
  • 獲取一個(gè)存在的Flink environment
  • 創(chuàng)建一個(gè)本地的Flink environment
  • 創(chuàng)建一個(gè)遠(yuǎn)程的Flink environment

Typically, you only need to use getExecutionEnvironment (). This will do the right thing based on your context. If you are executing on a local environment in an IDE then it will start a local execution environment . Otherwise, if you are executing the JAR then the Flink cluster manager will execute the in a distributed manner.
(典型的免绿,你只需要用getExecutionEnvironment ()方法唧席,F(xiàn)link 會(huì)基于你的上下文獲取正確的Flink environment。如果 你在本地IDE執(zhí)行它將啟動(dòng)一個(gè)local execution environment嘲驾。否則淌哟,如果你執(zhí)行JAR,那么Flink cluster Manager會(huì)以分布式方式運(yùn)行。)

If you want to create a local or remote environment on your own then you can also choose do so by using methods such as createLocalEnvironment () and createRemoteEnvironment (string host, int port, string, and . iar files).

如果你想在自己的環(huán)境中創(chuàng)建一個(gè)local environmentremove environment辽故,你可以選擇這兩個(gè)方法:

  • createLocalEnvironment ()
  • createRemoteEnvironment (string host, int port, string, and . jar files)

Data sources

Sources are places where the Flink program expects to get its data from. This is a second step in the Flink program's anatomy. Flink supports a number of pre-implemented data source functions. It also supports writing custom data source functions so anything that is not supported can be programmed easily. First let's try to understand the built-in source functions.
Sources是Flink應(yīng)用程序預(yù)期獲取數(shù)據(jù)的地方徒仓。這是Flink 程序結(jié)構(gòu)的第二步。Flink會(huì)支持一些預(yù)先實(shí)現(xiàn)的Sources方法誊垢。而對(duì)于不支持的Sources掉弛,它提供自定義方法,所以很容易通過(guò)編程實(shí)現(xiàn)喂走。首先狰晚,我們先了解一下build-in(內(nèi)建)的Source 方法。

Socket-based

DataStream API supports reading data from a socket. You just need to specify the host and port to read the data from and it will do the work:
DataStream API支持從socket讀數(shù)據(jù)缴啡。你只需要指定hostpost即可壁晒,它

sockeTextStream(hostName,port);//譯者注:default delimiter is "\n"

You can also choose to specify the delimiter:

sockeTextStream(hoatName,port,delimiter)

You can also specify the maximum number of times the API should try to fetch the data sockeTextStream (hostName, port, delimiter, maxRetry)

File-based

You can also choose to stream data from a file source using file-based source functions in Flink. You can use readTextFile (string path) to stream data from a file specified in the path. By default it will read TextInputFormat and will read strings line by line
你可以用file-bases source方法從文件中讀取流。具體用readTextFile(String path)方法從指定的文件中獲取stream业栅。該方法默認(rèn)用TextInputFormat一行一行地讀取內(nèi)容秒咐。

If the file format is other than text, you can specify the same using these functions:
如果文件的format不是text,而是其他的format,你可以指定FileInputFormat參數(shù)
方法如下

readFile(FileInputFormat<Out> inputFormat,string path)

Flink also supports reading file streams as they are produced using the readFileStream ().function:
Filnk 的readFileStream ()支持在文件流產(chǎn)生時(shí)讀取

//譯者注 @deprecated Use {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} instead'
readFileStream (string filepath,
 long inkervalMillis,FileMonitorincEunction. watchTvpe watchType).

譯者摘選部分源碼


/**
 * The mode in which the {@link ContinuousFileMonitoringFunction} operates.
 * This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}.
 */
@PublicEvolving
public enum FileProcessingMode {

    /** Processes the current contents of the path and exits. */
    PROCESS_ONCE,

    /** Periodically scans the path for new data. */
    PROCESS_CONTINUOUSLY
}


/**
     * The watch type of the {@code FileMonitoringFunction}.
     */
    public enum WatchType {
        ONLY_NEW_FILES, // Only new files will be processed.
        REPROCESS_WITH_APPENDED, // When some files are appended, all contents
                                    // of the files will be processed.
        PROCESS_ONLY_APPENDED // When some files are appended, only appended
                                // contents will be processed.
    }

You just need to specify the file path, the polling interval in which the file path should be polled, and the watch type.Watch types consist of three types:
你只需要指定文件路徑碘裕,對(duì)該文件的輪循間隔以及watch type 携取。
watch type包括以下三種(譯者注:該方法已過(guò)期,見(jiàn)上文代碼注釋

  • FileMonitoringFunction. WatchType.ONLY_NEW_FILES is used when the system should process only new files (新文件全讀)
  • FileMonitoringFunction. WatchType. PROCESS_ONLY_APPENDED is used when the system should process only appended contents of files (只讀append 部分)
  • FileMonitoringFunction. WatchType. REPROCESS_WIIH _APPENDED is used when the system should re-process not only the appended contents of files but also the previous content in the file(有apend 全讀)

If the file is not a text file, then we do have an option to use following function, which lets us define the file input format
如果不是文本文件帮孔,我們使用下面這個(gè)方法雷滋,這個(gè)方法讓我們定義一個(gè)FileFormat參數(shù)

readFile (fileInputFormat, path, watchType, interval, pathFilter,typeInfo)

Internally, it divides the reading file task into two sub-tasks. One sub task only monitors the file path based on the WatchType given. The second sub-task does the actual file reading in parallel. The sub-task which monitors the file path is a non-parallel sub-task. Its job is to keep scanning the file path based on the polling interval and report files to be processed, split the files, and assign the splits to the respective downstream threads:
Flink 內(nèi)部,它會(huì)將這個(gè)讀文件的任務(wù)分成兩個(gè)子任務(wù)文兢。一個(gè)子任務(wù)只監(jiān)控基于給定WatchTypefile path晤斩。第二個(gè)是實(shí)際讀文件的任務(wù),這個(gè)任務(wù)會(huì)并行運(yùn)行姆坚。而這個(gè)監(jiān)控文件路徑的任務(wù)不是并行的澳泵。它會(huì)持續(xù)根據(jù)輪循周期掃描file path。然后報(bào)告這些文件(files),分割文件兼呵,并將這些分片指給對(duì)應(yīng)的下游線程兔辅。
譯者注:這里path是路徑還是文件腊敲?每個(gè)split 是一個(gè)大文件的切片還是對(duì)一個(gè)目錄下的小文件?

image.png

Transformations

Data transformations transform the data stream from one form into another. The input could be one or more data streams and the output could also be zero, or one or more data streams. Now let's try to understand each transformation one by one.
Data transformation會(huì)將數(shù)stream從一種形式轉(zhuǎn)換成另一種形式维苔。輸入的數(shù)據(jù)流可以是一個(gè)碰辅,也可以是多個(gè);而輸出也可能沒(méi)有介时,可能是一個(gè)或多個(gè)乎赴。好了,下面我們一個(gè)一個(gè)地來(lái)理解transformation

Map

This is one of the simplest transformations, where the input is one data stream and the output is also one data stream

Map 是最簡(jiǎn)單的transformation 之一潮尝,這種transformation有輸入和輸出都只有一個(gè)榕吼。
In Java:

inputStream.map (new MapFunction() <Integer,Integer>(){
    @Override
      public Integer map (Intege value) throws Exception{
        return 5 *value;
    }
}};

In Scala:

inputStream.map {x =>x5}

FlatMap

FlatMap takes one record and outputs zero, one, or more than one record

FlatMap 的輸入只有1條記錄,而輸出可以是0勉失,1或更多的記錄羹蚣。
In Java:

inputStream. flatMap (new FlatMaprunction<string, string>() {
 @override
public void flatMap (string value, collector<string> out) throws Exception {
    for (string word: value.split("")){ 
        out.collect (word);
    }
  }
});

In Scala

inputStream. flatMap {atr => atr.aplit(" ") }

Filter

Filter functions evaluate the conditions and then, if they result as true, only emit the record.Filter functions can output zero records
Filter 方法會(huì)計(jì)算條件的值,然后判斷結(jié)果值如果為true,則發(fā)出一條記錄乱凿。該方法也可以輸出0條記錄顽素。

In Java:

inputStream. filter (new FilterFunction<Integer>(){ 
@override public boolean filter (intecer value) throws Exception {
      return value!= 1;
    }
});

In Scala:

inputStream.filter {-!=1}

KeyBy

KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream.
KeyBy方法會(huì)在邏輯上通過(guò)key對(duì)stream進(jìn)行分區(qū)。內(nèi)部會(huì)使用hash方法對(duì)流進(jìn)行分區(qū)徒蟆,它返回KeyedDataStream

In Java:

inputStream. KeyBy ("someKey");

In Scala:

inputStream.keyBy ("someKey")

Reduce

Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream
Reduce會(huì)通過(guò)將最后歸納的結(jié)果值和當(dāng)前的值進(jìn)行歸納而推出KeyedDataStream胁出。

In Java:

keyedInputStream. reduce (new Reducerunction() {
@override
public Integer reduce (Integer valuel, Integer value2)throws Exception {
    return value1 -value2;}
});

In Scala:

keyedInputStream. reduce{_+_}

Fold

Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back
Fold通過(guò)將最后folder流和當(dāng)前記錄組合而推出KeyDataStream,它返回?cái)?shù)據(jù)流段审。
In Java:

keyedInputStream keyedstream. fold("start", new Foldrunction<Integer, string>(){
@override public string fold(string current, Integer value) { 
return current ."=" -value;
}
});

In Scala:

keyedInputStream. fold("start") ((str, i) =>str+"="+i}).

The preceding given function when applied on a stream of (1,2,3,4.5) would emit a stream like this: Start=1-2-3-4-5
前面給出的函數(shù)在(1,2,3,4.5)流上應(yīng)用時(shí)將得出這樣的流:Start=1-2-3-4-5

Aggregations

DataStream API supports various aggregations such as min, max, sum, and so on. These functions can be applied on KeyedDataStream in order to get rolling aggregations
DataStream API 支持多種象min,max,sum等操作全蝶。這些函數(shù)應(yīng)用在KeyedDataStream上,以便進(jìn)行滾動(dòng)聚合寺枉。

In Java

keyedInputStream. sum (0)
keyedInputStream. sum ("key") 
kevedInputStream.min (0)
keyedInputStream.min ("key") 
keyedInputStream.max (0)
kevedInputStream.max ("key") 
keyedInputStream.minBy (0) 
keyedInputStream.minBy ("key")
keyedInputStream.maxBy (0)
keyedInputStream, maxBy ("key")

In Scala:

keyedInputStream. sum (0).
keyedInputStream. sum ("key")
keyedInputStream.min(0)
keyedInputStream. min ("key") 
keyedInputStream.max (0)
keyedInputStream. max ("key") 
keyedInputStream.minBy (0)
keyedInputStream. minBy ("key")
keyedInputStream.maxBy (0) 
keyedInputStream. maxBy ("key")

The difference between max and maxBy is that max returns the maximum value in a stream but maxBy returns a key that has a maximum value. The same applies to min and minBy.
minmaxBy的區(qū)別是:min返回流中的最大值抑淫,而maxBy會(huì)返回具有最大值的key,對(duì)于minminBy也是一樣的姥闪。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末始苇,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子筐喳,更是在濱河造成了極大的恐慌催式,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件避归,死亡現(xiàn)場(chǎng)離奇詭異荣月,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)槐脏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)喉童,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人顿天,你說(shuō)我怎么就攤上這事堂氯。” “怎么了牌废?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵咽白,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我鸟缕,道長(zhǎng)晶框,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任懂从,我火速辦了婚禮授段,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘番甩。我一直安慰自己侵贵,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布缘薛。 她就那樣靜靜地躺著窍育,像睡著了一般。 火紅的嫁衣襯著肌膚如雪宴胧。 梳的紋絲不亂的頭發(fā)上漱抓,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音恕齐,去河邊找鬼乞娄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛显歧,可吹牛的內(nèi)容都是我干的补胚。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼追迟,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼溶其!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起敦间,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤瓶逃,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后廓块,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體厢绝,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年带猴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了昔汉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拴清,死狀恐怖靶病,靈堂內(nèi)的尸體忽然破棺而出会通,到底是詐尸還是另有隱情,我是刑警寧澤娄周,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布涕侈,位于F島的核電站,受9級(jí)特大地震影響煤辨,放射性物質(zhì)發(fā)生泄漏裳涛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一众辨、第九天 我趴在偏房一處隱蔽的房頂上張望端三。 院中可真熱鬧,春花似錦鹃彻、人聲如沸郊闯。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)虚婿。三九已至,卻和暖如春泳挥,著一層夾襖步出監(jiān)牢的瞬間然痊,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工屉符, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留剧浸,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓矗钟,卻偏偏與公主長(zhǎng)得像唆香,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吨艇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,346評(píng)論 0 10
  • 不做不可及的夢(mèng)躬它,這使我的睡眠安恬。避開(kāi)無(wú)事時(shí)過(guò)分熱絡(luò)的友誼东涡,這使我少些負(fù)擔(dān)和承諾冯吓。不說(shuō)無(wú)謂的閑言,這使我覺(jué)得清暢疮跑。...
    花里眠閱讀 1,467評(píng)論 7 17
  • 放棄了一個(gè)喜歡了很久的人是什么感覺(jué)组贺? 有人說(shuō),那感覺(jué)就像拔掉了一顆蛀牙祖娘,雖然沒(méi)有那么疼了失尖,但永遠(yuǎn)有個(gè)位置是屬于他的...
    eb12c6931452閱讀 198評(píng)論 0 0
  • 在一個(gè)周末的夜晚我看了一部叫白日夢(mèng)想家的電影掀潮,這部電影剛開(kāi)始時(shí)主角mitty炫酷吊炸天的幻想讓人看的有點(diǎn)玄乎菇夸,比...
    桑先生閱讀 594評(píng)論 0 1