監(jiān)控維基百科編輯流(隨意翻譯---不一定準確)

原文鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/run_example_quickstart.html#setting-up-a-maven-project

本指南從零開始,創(chuàng)建一個流分析的Flink項目,并在Flink集群中運行员咽。

Wikipedia(維基百科)提供了一個IRC渠道覆山,在這個渠道中所有編輯wiki的記錄都會被記錄下來。我們將通過Flink去讀取這個渠道的數(shù)據(jù)享潜,并通過一個指定的時間窗口去統(tǒng)計每個用戶編輯的字節(jié)數(shù)线婚。這個例子很簡單匈织,用Flink幾分鐘就可以實現(xiàn)了,但是會為你日后書寫更加復雜的流式分析程序打上了堅實的基礎授舟。

創(chuàng)建一個Maven項目(Setting up a Maven Project)

我們將使用Flink的Maven原形來創(chuàng)建我們的項目結構救恨,請參考Java API快速入門文檔來獲取跟多關于Flink Maven原形創(chuàng)建Flink項目的信息。(https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html)

對于我們這個例子來說释树,命令行如下:

$mvn archetype:generate\

-DarchetypeGroupId=org.apache.flink\

-DarchetypeArtifactId=flink-quickstart-java\

-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/\

-DarchetypeVersion=1.4-SNAPSHOT\

-DgroupId=wiki-edits\

-DartifactId=wiki-edits\

-Dversion=0.1\

-Dpackage=wikiedits\

-DinteractiveMode=false

你可以通過自己的喜好來編輯groupId,artifactId和package肠槽。通過上述參數(shù),我們將創(chuàng)建一個如下圖所示的Maven工程:


這里有一個已經(jīng)添加了Flink依賴的pom.xml文件在根目錄中奢啥,還有幾個Flink的示例程序在src/main/java目錄下.因為我們要從頭開始秸仙,所以需要刪掉這些示例:

$rm wiki-edits/src/main/java/wikiedits/*.java

最后一步,我們需要將Flink Wikipedia的connector(連接器)添加到maven依賴中桩盲,這樣我們就可以在程序中直接調用了寂纪。編輯pom.xml中的dependencies模塊,如下所示:


注意:flink-connector-wikiedits_2.11的依賴已經(jīng)添加完畢赌结。(這個例子都到Apache Samza的Hello Samza示例的啟發(fā))

編寫一個Flink程序(Writing a Flink Program)

編碼時間到弊攘,打開你喜歡的IDE并導入maven工程抢腐,或者打開文本編輯器并創(chuàng)建src/main/java/wikiedits/wikieditAnalysis.java文件

packagewikiedits;

public?class?WikipediaAnalysis{

? ? ? ?public?static?void?main(String?[ ]?args)?throws?Exception{

? ? ? ?}

}

這段代碼現(xiàn)在還很簡單,但是我們后續(xù)會進一步的完善它襟交。注意:我這里不會給出導入的語句迈倍,因為IDE會自動的去導入。在本節(jié)的最后我會展示包括導入語句在內的完整代碼捣域,如果你想跳過前面啼染,可以將后面的完整代碼寫入你的編輯器中。

編寫Flink程序的第一步就是創(chuàng)建一個StreamExecutionEnvironment(如果你寫得是批處理的Job的話焕梅,就是ExecutionEnvironment)迹鹅。StreamExecutionEnvironment(或者ExecutionEnvironment)將用來設置運行參數(shù)以及創(chuàng)建從外部系統(tǒng)讀取數(shù)據(jù)的sources。接下來贞言,我們將StreamExecutionEnvironment添加到main方法中:

StreamExecutionEnvironment see?=?StreamExecutionEnvironment.getExecutionEnvironment();

接下來我們來創(chuàng)建一個讀取Wikipedia IRC日志的source

DataStream<WikipediaEditEvent>?edits?=?see.addSource(new?WikipediaEditsSource());

這創(chuàng)建了WikipediaEvent元素的DataStream斜棚,我們將進一步處理。在這個例子中我們主要關心的是每個用戶在一個給定的時間窗內添加了或者刪除了多少字節(jié)數(shù)该窗。為此我們首先通過用戶名來指定這個流的key弟蚀,也就是說流上的操作必須把用戶名考慮進去。在這個例子中在windows中的編輯總字節(jié)數(shù)必須是每一個唯一的用戶酗失。為了給Stream加key义钉,我們需要提供一個KeySelector類,如下:

KeyedStream<WikipediaEditEvent,String>keyedEdits=edits

.keyBy(new?KeySelector<WikipediaEditEvent,String>(){

? ? ? ? ? ?@Override

? ? ? ? ? ? public?String?getKey(WikipediaEditEventevent){

? ? ? ? ? ? ? ? ? ? ?return?event.getUser();

? ? ? ? ? ? }

});

這給了我們一個WikipediaEditEvent類型的流规肴,并有一個String類型的key—用戶名〈氛ⅲ現(xiàn)在我們可以在這個流上加一個window(窗口),并計算在這個窗口中的元素的結果。Window(窗口)指定了要執(zhí)行計算的流的一個分片拖刃。Window在執(zhí)行無限數(shù)據(jù)流元素的聚合操作時是很有必要的删壮。在我們的例子中我們需要計算每5秒鐘用戶編輯的字節(jié)數(shù)的聚合

DataStream<Tuple2<String,Long>>result=keyedEdits

.timeWindow(Time.seconds(5))

.fold(new?Tuple2<>("",0L),new?FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){

? ? ? ? ? ?@Override

? ? ? ? ? ?public?Tuple2<String,Long>fold(Tuple2<String,Long>acc,WikipediaEditEventevent){

? ? ? ? ? ? ? ? ? ? ? ? ? acc.f0?=?event.getUser();

? ? ? ? ? ? ? ? ? ? ? ? ? acc.f1?+=?event.getByteDiff();

? ? ? ? ? ? ? ? ?return?acc;

? ? ? ? ? ?}

});

第一個調用函數(shù).timeWindow()指定了我們需要一個5秒鐘的翻滾窗口,第二個調用函數(shù)指定了一個根據(jù)每個用戶在每個窗口分片中的Fold轉換操作兑牡。在我們的例子中我們以一個初始值(“”,0L)開始醉锅,并將時間窗口中每個用戶的每次編輯的字節(jié)流添加進去,現(xiàn)在的Stream結果包含了每個用戶在5秒鐘內的產生的字節(jié)流Tuple2(String, Long),String是用戶名发绢,Long是字節(jié)數(shù)硬耍。

剩下的最后一件事就是將Stream打印到控制臺并啟動執(zhí)行程序:

?result.print();

?see.execute();

最后一個調用對于啟動一個Flink作業(yè)來說是非常有必要的。所有的操作边酒,例如創(chuàng)建sources经柴、transformations以及sinks都只是建立了一個內部的圖而已,只有當execute()被調用的時候墩朦,這些操作才會被提交到集群中去執(zhí)行或者在本地的機器上開始執(zhí)行坯认。

截止到現(xiàn)在的所有代碼如下:

package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public?class?WikipediaAnalysis?{

public?static?void?main(String?[]?args)throws?Exception{

? ? ? ? ? ?StreamExecutionEnvironment see?=?StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? ? DataStream<WikipediaEditEvent>?edits?=?see.addSource(new?WikipediaEditsSource());

? ? ? ? ?KeyedStream<WikipediaEditEvent,String>?keyedEdits?=?edits

? ? ? ? ?.keyBy(new?KeySelector<WikipediaEditEvent,String>(){

? ? ? ? ? @Override

? ? ? ? ? public?String?getKey(?WikipediaEditEvent event?){

? ? ? ? ? ? ? ? ? ? ?returnevent.getUser();

? ? ? ? ? ?}

? ? ? ? ? });

? ? ? ? ? DataStream<Tuple2<String,Long>>?result?=?keyedEdits

? ? ? ? ? .timeWindow(Time.seconds(5))

? ? ? ? ? .fold(new?Tuple2<>("",0L),new?FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){

? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? public?Tuple2<String,Long>fold(Tuple2<String,Long>?acc,WikipediaEditEvent event){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?acc.f0?=?event.getUser();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?acc.f1?+=?event.getByteDiff();

? ? ? ? ? ? ? ? ? ? ? ? ? ? returnacc;

? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? });

? ? ? ? ? result.print();

? ? ? ? ? see.execute();

? ? ? ? ?}

}

你可以在你的IDE中去執(zhí)行這個例子或者在Maven在命令行中執(zhí)行:

$mvn clean package

$mvn exec:java -Dexec.mainClass?=?wikiedits.WikipediaAnalysis

第一條命令是構建我們的項目,第二條命令是執(zhí)行我們的main方法的類,輸出的結果如下:

1>(Fenix down,114)

6>(AnomieBOT,155)

8>(BD2412bot,-3690)

7>(IgnorantArmies,49)

3>(Ckh3111,69)

5>(Slade360,0)

7>(Narutolovehinata5,2195)

6>(Vuyisa2001,79)

4>(Ms Sarah Welch,269)

4>(KasparBot,-245)

每一行前面的數(shù)字告訴你打印輸出的操作來自哪個并行實例

額外的訓練:運行在集群中并寫入Kafka

請在我們開始之前首先參考Flink搭建快速入門在你的機器中搭建一個Flink集群牛哺,參考Kafka快速入門搭建一個Kafka環(huán)境陋气。

第一步:我們需要將Flink Kafka的connector(連接器)作為maven依賴,這樣的話我們就可以直接調用Kafka Sink了引润,將Flink Kafka Connector添加到pom.xml文件的依賴部分:

下一步巩趁,我們需要修改我們的程序,我們將移除print() sink并使用Kafka sink來替代淳附。新的代碼如下:

result

.map(new?MapFunction<Tuple2<String,Long>,String>(){

? ? ? @Override

? ? ? ?public?String?map(Tuple2<String,Long>?tuple){

? ? ? ? ? ? ? ? ? ?return?tuple.toString();

? ? ? ? }

? })

? .addSink(new FlinkKafkaProducer08<>("localhost:9092","wiki-result",new?SimpleStringSchema()));

導入相關的類:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import org.apache.flink.api.common.functions.MapFunction;

注意:我們首先得將Tuple2類型的Stream通過MapFunction轉換成String類型的Stream议慰。這樣做主要是因為寫String到Kafka會更加容易。接下來我們創(chuàng)建一個Kafka的sink奴曙。你可能還需要適配一下你集群的hostname和端口别凹。”wiki-result”是Kafka stream的名稱洽糟,這是我們接下來要創(chuàng)建的炉菲,在我們跑程序之前,通過Maven來構建我們的工程因為在集群中運行的時候我們需要用到jar包坤溃。

$mvn clean package

結果jar包保存在target的子目錄中:target/wiki-edits-0.1.jar,接下來我們會用到這個jar包∨乃現(xiàn)在我們準備啟動Flink集群并在上面運行寫Kafka數(shù)據(jù)的程序。切換到你安裝Flink的目錄并啟動local集群浇雹。

$cd my/flink/directory

$bin/start-local.sh

同時我們還需要創(chuàng)建一個Kafka的Topic,這樣我們才能往這個Topic中寫數(shù)據(jù):

$cd my/kafka/directory

$bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

現(xiàn)在我們準備在我們這個本地集群中運行我們的jar包:

$cd my/flink/directory

$bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切如期執(zhí)行的話屿讽,我們將得到如下的結果輸出:

03/08/2016 15:09:27 Job execution switched to status RUNNING.

03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED

03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING

03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED

03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING

03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING

03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

你可以看到各個操作之間如何開始執(zhí)行昭灵,這里只有兩個操作,因為window之后的操作因為性能需要伐谈,合并到一個操作里去了烂完。在Flink中,我們稱之為chaining(鏈式操作)诵棵。

你可以使用Kafka的控制臺consumer來檢查Kafka的topic以觀察程序的輸出抠蚣。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result

你可以通過運行在http://localhost:8081上的Flink儀表盤來查看你的集群資源信息及運行的任務的信息。


如果你點擊你運行的任務的話履澳,你還可以看到一個視圖嘶窄,在這個視圖里你可以查看每一個操作,例如:查看已經(jīng)處理過的元素的個數(shù)距贷。


最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末柄冲,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子忠蝗,更是在濱河造成了極大的恐慌现横,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異戒祠,居然都是意外死亡骇两,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進店門姜盈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來低千,“玉大人,你說我怎么就攤上這事贩据《安伲” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵饱亮,是天一觀的道長矾芙。 經(jīng)常有香客問我,道長近上,這世上最難降的妖魔是什么剔宪? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮壹无,結果婚禮上葱绒,老公的妹妹穿的比我還像新娘。我一直安慰自己斗锭,他們只是感情好地淀,可當我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著岖是,像睡著了一般帮毁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上豺撑,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天烈疚,我揣著相機與錄音,去河邊找鬼聪轿。 笑死爷肝,一個胖子當著我的面吹牛,可吹牛的內容都是我干的陆错。 我是一名探鬼主播灯抛,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼音瓷!你這毒婦竟也來了牧愁?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤外莲,失蹤者是張志新(化名)和其女友劉穎猪半,沒想到半個月后兔朦,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡磨确,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年沽甥,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乏奥。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡摆舟,死狀恐怖,靈堂內的尸體忽然破棺而出邓了,到底是詐尸還是另有隱情恨诱,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布骗炉,位于F島的核電站照宝,受9級特大地震影響,放射性物質發(fā)生泄漏句葵。R本人自食惡果不足惜厕鹃,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望乍丈。 院中可真熱鬧剂碴,春花似錦、人聲如沸轻专。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽请垛。三九已至催训,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間叼屠,已是汗流浹背瞳腌。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工绞铃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留镜雨,地道東北人。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓儿捧,卻偏偏與公主長得像荚坞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子菲盾,可洞房花燭夜當晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內容