Real-time analytics is currently an important issue. Many different domains need to process data in real time. So far there have been multiple technologies trying to provide this capability. Technologies such as Storm and Spark have been on the market for a long time now. Applications derived from the Internet of Things (IoT) need data to be stored儡率,processed, and analyzed in real or near real time. In order to cater for such needs, Flink provides a streaming data processing API called DataStream API.
(當(dāng)前Real-time
分析是一個(gè)非常重要的問(wèn)題盈滴。很多領(lǐng)域都需要實(shí)時(shí)地處理數(shù)據(jù)锰扶。截止目前,有很多技術(shù)來(lái)提供這種數(shù)據(jù)的實(shí)時(shí)處理能力酵幕。象Storm
Spark
這種技術(shù)很早就已經(jīng)出現(xiàn)了。源于互聯(lián)網(wǎng)的應(yīng)用程序需要實(shí)時(shí)或準(zhǔn)實(shí)時(shí)地存儲(chǔ)翘贮,處理及分析它們的數(shù)據(jù)迂曲。為滿足這些需求,F(xiàn)link提供了流數(shù)據(jù)處理API 叫DataStream API
)
In this chapter, we are going to look at the details relating to DataStream API, covering the following topics:
(在這一節(jié)佳遂,我們著眼于DataStream API
相關(guān)的一些細(xì)節(jié)营袜,覆蓋以下幾個(gè)topic)
- Execution environment
- Data sources
- Transformations
- Data sinks
- Connectors
- Use case -sensor data analytics
Any Flink program works on a certain defined anatomy as follows:
Flink應(yīng)用程序基于確定的結(jié)構(gòu)工作。如下圖所示:
We will be looking at each step and how we can use DataStream API with this anatomy.
我們會(huì)研究這里的每一步丑罪,以及我們?cè)趺词褂?code>DataStream API荚板。
Execution environment
In order to start writing a Flink program, we first need to get an existing execution environment or create one.
Flink應(yīng)用程序凤壁,首先,我們需要獲得一個(gè)execution environment
,或者創(chuàng)建一個(gè)execution environment
跪另。
Depending upon what you are trying to do, Flink supports:
- Getting an already existing Flink environment
- Creating a local environment.
- Creating a remote environment拧抖。
根據(jù)你的想法(獲取還是新建?)Flink 支持: - 獲取一個(gè)存在的
Flink environment
- 創(chuàng)建一個(gè)本地的
Flink environment
- 創(chuàng)建一個(gè)遠(yuǎn)程的
Flink environment
Typically, you only need to use getExecutionEnvironment (). This will do the right thing based on your context. If you are executing on a local environment in an IDE then it will start a local execution environment . Otherwise, if you are executing the JAR then the Flink cluster manager will execute the in a distributed manner.
(典型的免绿,你只需要用getExecutionEnvironment ()
方法唧席,F(xiàn)link 會(huì)基于你的上下文獲取正確的Flink environment
。如果 你在本地IDE
執(zhí)行它將啟動(dòng)一個(gè)local execution environment
嘲驾。否則淌哟,如果你執(zhí)行JAR
,那么Flink cluster Manager
會(huì)以分布式方式運(yùn)行。)
If you want to create a local or remote environment on your own then you can also choose do so by using methods such as createLocalEnvironment () and createRemoteEnvironment (string host, int port, string, and . iar files).
如果你想在自己的環(huán)境中創(chuàng)建一個(gè)local environment
或remove environment
辽故,你可以選擇這兩個(gè)方法:
- createLocalEnvironment ()
- createRemoteEnvironment (string host, int port, string, and . jar files)
Data sources
Sources are places where the Flink program expects to get its data from. This is a second step in the Flink program's anatomy. Flink supports a number of pre-implemented data source functions. It also supports writing custom data source functions so anything that is not supported can be programmed easily. First let's try to understand the built-in source functions.
Sources
是Flink應(yīng)用程序預(yù)期獲取數(shù)據(jù)的地方徒仓。這是Flink 程序結(jié)構(gòu)的第二步。Flink會(huì)支持一些預(yù)先實(shí)現(xiàn)的Sources
方法誊垢。而對(duì)于不支持的Sources
掉弛,它提供自定義方法,所以很容易通過(guò)編程實(shí)現(xiàn)喂走。首先狰晚,我們先了解一下build-in
(內(nèi)建)的Source
方法。
Socket-based
DataStream API supports reading data from a socket. You just need to specify the host and port to read the data from and it will do the work:
DataStream API
支持從socket
讀數(shù)據(jù)缴啡。你只需要指定host
和post
即可壁晒,它
sockeTextStream(hostName,port);//譯者注:default delimiter is "\n"
You can also choose to specify the delimiter:
sockeTextStream(hoatName,port,delimiter)
You can also specify the maximum number of times the API should try to fetch the data sockeTextStream (hostName, port, delimiter, maxRetry)
File-based
You can also choose to stream data from a file source using file-based source functions in Flink. You can use readTextFile (string path) to stream data from a file specified in the path. By default it will read TextInputFormat and will read strings line by line
你可以用file-bases source
方法從文件中讀取流。具體用readTextFile(String path)
方法從指定的文件中獲取stream
业栅。該方法默認(rèn)用TextInputFormat
一行一行地讀取內(nèi)容秒咐。
If the file format is other than text, you can specify the same using these functions:
如果文件的format
不是text,而是其他的format
,你可以指定FileInputFormat
參數(shù)
方法如下
readFile(FileInputFormat<Out> inputFormat,string path)
Flink also supports reading file streams as they are produced using the readFileStream ().function:
Filnk 的readFileStream ()
支持在文件流產(chǎn)生時(shí)讀取
//譯者注 @deprecated Use {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} instead'
readFileStream (string filepath,
long inkervalMillis,FileMonitorincEunction. watchTvpe watchType).
譯者摘選部分源碼
/**
* The mode in which the {@link ContinuousFileMonitoringFunction} operates.
* This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}.
*/
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
/**
* The watch type of the {@code FileMonitoringFunction}.
*/
public enum WatchType {
ONLY_NEW_FILES, // Only new files will be processed.
REPROCESS_WITH_APPENDED, // When some files are appended, all contents
// of the files will be processed.
PROCESS_ONLY_APPENDED // When some files are appended, only appended
// contents will be processed.
}
You just need to specify the file path, the polling interval in which the file path should be polled, and the watch type.Watch types consist of three types:
你只需要指定文件路徑碘裕,對(duì)該文件的輪循間隔以及watch type
携取。
watch type
包括以下三種(譯者注:該方法已過(guò)期,見(jiàn)上文代碼注釋
)
- FileMonitoringFunction. WatchType.ONLY_NEW_FILES is used when the system should process only new files (新文件全讀)
- FileMonitoringFunction. WatchType. PROCESS_ONLY_APPENDED is used when the system should process only appended contents of files (只讀append 部分)
- FileMonitoringFunction. WatchType. REPROCESS_WIIH _APPENDED is used when the system should re-process not only the appended contents of files but also the previous content in the file(有apend 全讀)
If the file is not a text file, then we do have an option to use following function, which lets us define the file input format
如果不是文本文件帮孔,我們使用下面這個(gè)方法雷滋,這個(gè)方法讓我們定義一個(gè)FileFormat參數(shù)
readFile (fileInputFormat, path, watchType, interval, pathFilter,typeInfo)
Internally, it divides the reading file task into two sub-tasks. One sub task only monitors the file path based on the WatchType given. The second sub-task does the actual file reading in parallel. The sub-task which monitors the file path is a non-parallel sub-task. Its job is to keep scanning the file path based on the polling interval and report files to be processed, split the files, and assign the splits to the respective downstream threads:
Flink 內(nèi)部,它會(huì)將這個(gè)讀文件的任務(wù)分成兩個(gè)子任務(wù)文兢。一個(gè)子任務(wù)只監(jiān)控基于給定WatchType
的file path
晤斩。第二個(gè)是實(shí)際讀文件的任務(wù),這個(gè)任務(wù)會(huì)并行運(yùn)行姆坚。而這個(gè)監(jiān)控文件路徑的任務(wù)不是并行的澳泵。它會(huì)持續(xù)根據(jù)輪循周期掃描file path
。然后報(bào)告這些文件(files),分割文件兼呵,并將這些分片指給對(duì)應(yīng)的下游線程兔辅。
譯者注:這里path是路徑還是文件腊敲?每個(gè)split 是一個(gè)大文件的切片還是對(duì)一個(gè)目錄下的小文件?
Transformations
Data transformations transform the data stream from one form into another. The input could be one or more data streams and the output could also be zero, or one or more data streams. Now let's try to understand each transformation one by one.
Data transformation
會(huì)將數(shù)stream
從一種形式轉(zhuǎn)換成另一種形式维苔。輸入的數(shù)據(jù)流可以是一個(gè)碰辅,也可以是多個(gè);而輸出也可能沒(méi)有介时,可能是一個(gè)或多個(gè)乎赴。好了,下面我們一個(gè)一個(gè)地來(lái)理解transformation
Map
This is one of the simplest transformations, where the input is one data stream and the output is also one data stream
Map
是最簡(jiǎn)單的transformation
之一潮尝,這種transformation
有輸入和輸出都只有一個(gè)榕吼。
In Java:
inputStream.map (new MapFunction() <Integer,Integer>(){
@Override
public Integer map (Intege value) throws Exception{
return 5 *value;
}
}};
In Scala:
inputStream.map {x =>x5}
FlatMap
FlatMap takes one record and outputs zero, one, or more than one record
FlatMap
的輸入只有1條記錄,而輸出可以是0勉失,1或更多的記錄羹蚣。
In Java:
inputStream. flatMap (new FlatMaprunction<string, string>() {
@override
public void flatMap (string value, collector<string> out) throws Exception {
for (string word: value.split("")){
out.collect (word);
}
}
});
In Scala
inputStream. flatMap {atr => atr.aplit(" ") }
Filter
Filter functions evaluate the conditions and then, if they result as true, only emit the record.Filter functions can output zero records
Filter
方法會(huì)計(jì)算條件的值,然后判斷結(jié)果值如果為true
,則發(fā)出一條記錄乱凿。該方法也可以輸出0條記錄顽素。
In Java:
inputStream. filter (new FilterFunction<Integer>(){
@override public boolean filter (intecer value) throws Exception {
return value!= 1;
}
});
In Scala:
inputStream.filter {-!=1}
KeyBy
KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream.
KeyBy
方法會(huì)在邏輯上通過(guò)key對(duì)stream
進(jìn)行分區(qū)。內(nèi)部會(huì)使用hash
方法對(duì)流進(jìn)行分區(qū)徒蟆,它返回KeyedDataStream
In Java:
inputStream. KeyBy ("someKey");
In Scala:
inputStream.keyBy ("someKey")
Reduce
Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream
Reduce
會(huì)通過(guò)將最后歸納的結(jié)果值和當(dāng)前的值進(jìn)行歸納而推出KeyedDataStream
胁出。
In Java:
keyedInputStream. reduce (new Reducerunction() {
@override
public Integer reduce (Integer valuel, Integer value2)throws Exception {
return value1 -value2;}
});
In Scala:
keyedInputStream. reduce{_+_}
Fold
Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back
Fold
通過(guò)將最后folder
流和當(dāng)前記錄組合而推出KeyDataStream
,它返回?cái)?shù)據(jù)流段审。
In Java:
keyedInputStream keyedstream. fold("start", new Foldrunction<Integer, string>(){
@override public string fold(string current, Integer value) {
return current ."=" -value;
}
});
In Scala:
keyedInputStream. fold("start") ((str, i) =>str+"="+i}).
The preceding given function when applied on a stream of (1,2,3,4.5) would emit a stream like this: Start=1-2-3-4-5
前面給出的函數(shù)在(1,2,3,4.5)流上應(yīng)用時(shí)將得出這樣的流:Start=1-2-3-4-5
Aggregations
DataStream API supports various aggregations such as min, max, sum, and so on. These functions can be applied on KeyedDataStream in order to get rolling aggregations
DataStream
API 支持多種象min,max,sum等操作全蝶。這些函數(shù)應(yīng)用在KeyedDataStream
上,以便進(jìn)行滾動(dòng)聚合寺枉。
In Java
keyedInputStream. sum (0)
keyedInputStream. sum ("key")
kevedInputStream.min (0)
keyedInputStream.min ("key")
keyedInputStream.max (0)
kevedInputStream.max ("key")
keyedInputStream.minBy (0)
keyedInputStream.minBy ("key")
keyedInputStream.maxBy (0)
keyedInputStream, maxBy ("key")
In Scala:
keyedInputStream. sum (0).
keyedInputStream. sum ("key")
keyedInputStream.min(0)
keyedInputStream. min ("key")
keyedInputStream.max (0)
keyedInputStream. max ("key")
keyedInputStream.minBy (0)
keyedInputStream. minBy ("key")
keyedInputStream.maxBy (0)
keyedInputStream. maxBy ("key")
The difference between max and maxBy is that max returns the maximum value in a stream but maxBy returns a key that has a maximum value. The same applies to min and minBy.
min
和maxBy
的區(qū)別是:min
返回流中的最大值抑淫,而maxBy
會(huì)返回具有最大值的key
,對(duì)于min
和minBy
也是一樣的姥闪。