原文鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/run_example_quickstart.html#setting-up-a-maven-project
本指南從零開始,創(chuàng)建一個流分析的Flink項目,并在Flink集群中運行员咽。
Wikipedia(維基百科)提供了一個IRC渠道覆山,在這個渠道中所有編輯wiki的記錄都會被記錄下來。我們將通過Flink去讀取這個渠道的數(shù)據(jù)享潜,并通過一個指定的時間窗口去統(tǒng)計每個用戶編輯的字節(jié)數(shù)线婚。這個例子很簡單匈织,用Flink幾分鐘就可以實現(xiàn)了,但是會為你日后書寫更加復雜的流式分析程序打上了堅實的基礎授舟。
創(chuàng)建一個Maven項目(Setting up a Maven Project)
我們將使用Flink的Maven原形來創(chuàng)建我們的項目結構救恨,請參考Java API快速入門文檔來獲取跟多關于Flink Maven原形創(chuàng)建Flink項目的信息。(https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html)
對于我們這個例子來說释树,命令行如下:
$mvn archetype:generate\
-DarchetypeGroupId=org.apache.flink\
-DarchetypeArtifactId=flink-quickstart-java\
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/\
-DarchetypeVersion=1.4-SNAPSHOT\
-DgroupId=wiki-edits\
-DartifactId=wiki-edits\
-Dversion=0.1\
-Dpackage=wikiedits\
-DinteractiveMode=false
你可以通過自己的喜好來編輯groupId,artifactId和package肠槽。通過上述參數(shù),我們將創(chuàng)建一個如下圖所示的Maven工程:
這里有一個已經(jīng)添加了Flink依賴的pom.xml文件在根目錄中奢啥,還有幾個Flink的示例程序在src/main/java目錄下.因為我們要從頭開始秸仙,所以需要刪掉這些示例:
$rm wiki-edits/src/main/java/wikiedits/*.java
最后一步,我們需要將Flink Wikipedia的connector(連接器)添加到maven依賴中桩盲,這樣我們就可以在程序中直接調用了寂纪。編輯pom.xml中的dependencies模塊,如下所示:
注意:flink-connector-wikiedits_2.11的依賴已經(jīng)添加完畢赌结。(這個例子都到Apache Samza的Hello Samza示例的啟發(fā))
編寫一個Flink程序(Writing a Flink Program)
編碼時間到弊攘,打開你喜歡的IDE并導入maven工程抢腐,或者打開文本編輯器并創(chuàng)建src/main/java/wikiedits/wikieditAnalysis.java文件
packagewikiedits;
public?class?WikipediaAnalysis{
? ? ? ?public?static?void?main(String?[ ]?args)?throws?Exception{
? ? ? ?}
}
這段代碼現(xiàn)在還很簡單,但是我們后續(xù)會進一步的完善它襟交。注意:我這里不會給出導入的語句迈倍,因為IDE會自動的去導入。在本節(jié)的最后我會展示包括導入語句在內的完整代碼捣域,如果你想跳過前面啼染,可以將后面的完整代碼寫入你的編輯器中。
編寫Flink程序的第一步就是創(chuàng)建一個StreamExecutionEnvironment(如果你寫得是批處理的Job的話焕梅,就是ExecutionEnvironment)迹鹅。StreamExecutionEnvironment(或者ExecutionEnvironment)將用來設置運行參數(shù)以及創(chuàng)建從外部系統(tǒng)讀取數(shù)據(jù)的sources。接下來贞言,我們將StreamExecutionEnvironment添加到main方法中:
StreamExecutionEnvironment see?=?StreamExecutionEnvironment.getExecutionEnvironment();
接下來我們來創(chuàng)建一個讀取Wikipedia IRC日志的source
DataStream<WikipediaEditEvent>?edits?=?see.addSource(new?WikipediaEditsSource());
這創(chuàng)建了WikipediaEvent元素的DataStream斜棚,我們將進一步處理。在這個例子中我們主要關心的是每個用戶在一個給定的時間窗內添加了或者刪除了多少字節(jié)數(shù)该窗。為此我們首先通過用戶名來指定這個流的key弟蚀,也就是說流上的操作必須把用戶名考慮進去。在這個例子中在windows中的編輯總字節(jié)數(shù)必須是每一個唯一的用戶酗失。為了給Stream加key义钉,我們需要提供一個KeySelector類,如下:
KeyedStream<WikipediaEditEvent,String>keyedEdits=edits
.keyBy(new?KeySelector<WikipediaEditEvent,String>(){
? ? ? ? ? ?@Override
? ? ? ? ? ? public?String?getKey(WikipediaEditEventevent){
? ? ? ? ? ? ? ? ? ? ?return?event.getUser();
? ? ? ? ? ? }
});
這給了我們一個WikipediaEditEvent類型的流规肴,并有一個String類型的key—用戶名〈氛ⅲ現(xiàn)在我們可以在這個流上加一個window(窗口),并計算在這個窗口中的元素的結果。Window(窗口)指定了要執(zhí)行計算的流的一個分片拖刃。Window在執(zhí)行無限數(shù)據(jù)流元素的聚合操作時是很有必要的删壮。在我們的例子中我們需要計算每5秒鐘用戶編輯的字節(jié)數(shù)的聚合
DataStream<Tuple2<String,Long>>result=keyedEdits
.timeWindow(Time.seconds(5))
.fold(new?Tuple2<>("",0L),new?FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){
? ? ? ? ? ?@Override
? ? ? ? ? ?public?Tuple2<String,Long>fold(Tuple2<String,Long>acc,WikipediaEditEventevent){
? ? ? ? ? ? ? ? ? ? ? ? ? acc.f0?=?event.getUser();
? ? ? ? ? ? ? ? ? ? ? ? ? acc.f1?+=?event.getByteDiff();
? ? ? ? ? ? ? ? ?return?acc;
? ? ? ? ? ?}
});
第一個調用函數(shù).timeWindow()指定了我們需要一個5秒鐘的翻滾窗口,第二個調用函數(shù)指定了一個根據(jù)每個用戶在每個窗口分片中的Fold轉換操作兑牡。在我們的例子中我們以一個初始值(“”,0L)開始醉锅,并將時間窗口中每個用戶的每次編輯的字節(jié)流添加進去,現(xiàn)在的Stream結果包含了每個用戶在5秒鐘內的產生的字節(jié)流Tuple2(String, Long),String是用戶名发绢,Long是字節(jié)數(shù)硬耍。
剩下的最后一件事就是將Stream打印到控制臺并啟動執(zhí)行程序:
?result.print();
?see.execute();
最后一個調用對于啟動一個Flink作業(yè)來說是非常有必要的。所有的操作边酒,例如創(chuàng)建sources经柴、transformations以及sinks都只是建立了一個內部的圖而已,只有當execute()被調用的時候墩朦,這些操作才會被提交到集群中去執(zhí)行或者在本地的機器上開始執(zhí)行坯认。
截止到現(xiàn)在的所有代碼如下:
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public?class?WikipediaAnalysis?{
public?static?void?main(String?[]?args)throws?Exception{
? ? ? ? ? ?StreamExecutionEnvironment see?=?StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? ? DataStream<WikipediaEditEvent>?edits?=?see.addSource(new?WikipediaEditsSource());
? ? ? ? ?KeyedStream<WikipediaEditEvent,String>?keyedEdits?=?edits
? ? ? ? ?.keyBy(new?KeySelector<WikipediaEditEvent,String>(){
? ? ? ? ? @Override
? ? ? ? ? public?String?getKey(?WikipediaEditEvent event?){
? ? ? ? ? ? ? ? ? ? ?returnevent.getUser();
? ? ? ? ? ?}
? ? ? ? ? });
? ? ? ? ? DataStream<Tuple2<String,Long>>?result?=?keyedEdits
? ? ? ? ? .timeWindow(Time.seconds(5))
? ? ? ? ? .fold(new?Tuple2<>("",0L),new?FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){
? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? public?Tuple2<String,Long>fold(Tuple2<String,Long>?acc,WikipediaEditEvent event){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?acc.f0?=?event.getUser();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?acc.f1?+=?event.getByteDiff();
? ? ? ? ? ? ? ? ? ? ? ? ? ? returnacc;
? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? });
? ? ? ? ? result.print();
? ? ? ? ? see.execute();
? ? ? ? ?}
}
你可以在你的IDE中去執(zhí)行這個例子或者在Maven在命令行中執(zhí)行:
$mvn clean package
$mvn exec:java -Dexec.mainClass?=?wikiedits.WikipediaAnalysis
第一條命令是構建我們的項目,第二條命令是執(zhí)行我們的main方法的類,輸出的結果如下:
1>(Fenix down,114)
6>(AnomieBOT,155)
8>(BD2412bot,-3690)
7>(IgnorantArmies,49)
3>(Ckh3111,69)
5>(Slade360,0)
7>(Narutolovehinata5,2195)
6>(Vuyisa2001,79)
4>(Ms Sarah Welch,269)
4>(KasparBot,-245)
每一行前面的數(shù)字告訴你打印輸出的操作來自哪個并行實例
額外的訓練:運行在集群中并寫入Kafka
請在我們開始之前首先參考Flink搭建快速入門在你的機器中搭建一個Flink集群牛哺,參考Kafka快速入門搭建一個Kafka環(huán)境陋气。
第一步:我們需要將Flink Kafka的connector(連接器)作為maven依賴,這樣的話我們就可以直接調用Kafka Sink了引润,將Flink Kafka Connector添加到pom.xml文件的依賴部分:
下一步巩趁,我們需要修改我們的程序,我們將移除print() sink并使用Kafka sink來替代淳附。新的代碼如下:
result
.map(new?MapFunction<Tuple2<String,Long>,String>(){
? ? ? @Override
? ? ? ?public?String?map(Tuple2<String,Long>?tuple){
? ? ? ? ? ? ? ? ? ?return?tuple.toString();
? ? ? ? }
? })
? .addSink(new FlinkKafkaProducer08<>("localhost:9092","wiki-result",new?SimpleStringSchema()));
導入相關的類:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
注意:我們首先得將Tuple2類型的Stream通過MapFunction轉換成String類型的Stream议慰。這樣做主要是因為寫String到Kafka會更加容易。接下來我們創(chuàng)建一個Kafka的sink奴曙。你可能還需要適配一下你集群的hostname和端口别凹。”wiki-result”是Kafka stream的名稱洽糟,這是我們接下來要創(chuàng)建的炉菲,在我們跑程序之前,通過Maven來構建我們的工程因為在集群中運行的時候我們需要用到jar包坤溃。
$mvn clean package
結果jar包保存在target的子目錄中:target/wiki-edits-0.1.jar,接下來我們會用到這個jar包∨乃現(xiàn)在我們準備啟動Flink集群并在上面運行寫Kafka數(shù)據(jù)的程序。切換到你安裝Flink的目錄并啟動local集群浇雹。
$cd my/flink/directory
$bin/start-local.sh
同時我們還需要創(chuàng)建一個Kafka的Topic,這樣我們才能往這個Topic中寫數(shù)據(jù):
$cd my/kafka/directory
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
現(xiàn)在我們準備在我們這個本地集群中運行我們的jar包:
$cd my/flink/directory
$bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
如果一切如期執(zhí)行的話屿讽,我們將得到如下的結果輸出:
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
你可以看到各個操作之間如何開始執(zhí)行昭灵,這里只有兩個操作,因為window之后的操作因為性能需要伐谈,合并到一個操作里去了烂完。在Flink中,我們稱之為chaining(鏈式操作)诵棵。
你可以使用Kafka的控制臺consumer來檢查Kafka的topic以觀察程序的輸出抠蚣。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result
你可以通過運行在http://localhost:8081上的Flink儀表盤來查看你的集群資源信息及運行的任務的信息。
如果你點擊你運行的任務的話履澳,你還可以看到一個視圖嘶窄,在這個視圖里你可以查看每一個操作,例如:查看已經(jīng)處理過的元素的個數(shù)距贷。