Flink編程入門案例

實(shí)時(shí)處理代碼開發(fā)
開發(fā)flink代碼导俘,實(shí)現(xiàn)統(tǒng)計(jì)socket當(dāng)中的單詞數(shù)量
第一步:創(chuàng)建maven工程呀伙,導(dǎo)入jar包
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

</dependencies>
<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

第二步:開發(fā)flink代碼統(tǒng)計(jì)socket當(dāng)中的單詞數(shù)量
開發(fā)flink代碼實(shí)現(xiàn)接受socket單詞數(shù)據(jù)蠢古,然后對數(shù)據(jù)進(jìn)行統(tǒng)計(jì)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

case class CountWord(word:String,count:Long)

object FlinkCount {

def main(args: Array[String]): Unit = {
//獲取程序入口類
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//從socket當(dāng)中獲取數(shù)據(jù)
val result: DataStream[String] = environment.socketTextStream("node01",9000)
//導(dǎo)入隱式轉(zhuǎn)換的包整吆,否則時(shí)間不能使用
import org.apache.flink.api.scala._
//將數(shù)據(jù)進(jìn)行切割牢裳,封裝到樣例類當(dāng)中着饥,然后進(jìn)行統(tǒng)計(jì)
val resultValue: DataStream[CountWord] = result
.flatMap(x => x.split(" "))
.map(x => CountWord(x,1))
.keyBy("word")
// .timeWindow(Time.seconds(1),Time.milliseconds(1)) 按照每秒鐘時(shí)間窗口参咙,以及每秒鐘滑動間隔來進(jìn)行數(shù)據(jù)統(tǒng)計(jì)
.sum("count")
//打印最終輸出結(jié)果
resultValue.print().setParallelism(1)
//啟動服務(wù)
environment.execute()
}
}

第三步:打包上傳到服務(wù)器運(yùn)行
將我們的程序打包龄广,然后上傳到服務(wù)器進(jìn)行運(yùn)行,將我們打包好的程序上傳到node01服務(wù)器蕴侧,然后體驗(yàn)在各種模式下進(jìn)行運(yùn)行我們的程序
1择同、standAlone模式運(yùn)行程序
第一步:啟動flink集群
node01執(zhí)行以下命令啟動flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh

第二步:啟動node01的socket服務(wù),并提交flink任務(wù)
node01執(zhí)行以下命令啟動node01的socket服務(wù)
nc -lk 9000
提交任務(wù)
將我們打包好的jar包上傳到node01服務(wù)器的/kkb路徑下净宵,然后提交任務(wù)敲才,注意,在pom.xml當(dāng)中需要添加我們的打包插件择葡,然后將任務(wù)代碼進(jìn)行打包紧武,且集群已有的代碼需要將打包scope設(shè)置為provided,在pom.xml將我們關(guān)于flink的jar包scope設(shè)置為provided

打包敏储,并將我們的jar-with-dependencies的jar包上傳到node01服務(wù)器的/kkb路徑下

node01執(zhí)行以下命令提交任務(wù)
cd /kkb/install/flink-1.8.1/
bin/flink run --class com.kkb.flink.demo1.FlinkCount /kkb/flink_day01-1.0-SNAPSHOT-jar-with-dependencies.jar
第三步:查詢運(yùn)行結(jié)果
node01查看運(yùn)行結(jié)果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-1-node01.kaikeba.com.out

注意:結(jié)果保存在以.out結(jié)尾的文件當(dāng)中阻星,哪個(gè)文件當(dāng)中有數(shù)據(jù),就查看哪個(gè)文件即可

離線批量處理代碼開發(fā)
flink也可以通過批量處理代碼來實(shí)現(xiàn)批量數(shù)據(jù)處理
需求:處理附件中的count.txt文件已添,實(shí)現(xiàn)單詞計(jì)數(shù)統(tǒng)計(jì)

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}

object BatchOperate {
def main(args: Array[String]): Unit = {

val inputPath = "D:\\count.txt"
val outPut = "D:\\data\\result2"

//獲取程序入口類ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)

//引入隱式轉(zhuǎn)換
import org.apache.flink.api.scala._

val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:\\datas\\result.txt").setParallelism(1)

env.execute("batch word count")

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末妥箕,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子更舞,更是在濱河造成了極大的恐慌畦幢,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)仗颈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門黍瞧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吗氏,“玉大人,你說我怎么就攤上這事雷逆。” “怎么了污尉?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵膀哲,是天一觀的道長。 經(jīng)常有香客問我被碗,道長某宪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任锐朴,我火速辦了婚禮兴喂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘焚志。我一直安慰自己衣迷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布酱酬。 她就那樣靜靜地躺著壶谒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪膳沽。 梳的紋絲不亂的頭發(fā)上汗菜,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機(jī)與錄音挑社,去河邊找鬼陨界。 笑死,一個(gè)胖子當(dāng)著我的面吹牛痛阻,可吹牛的內(nèi)容都是我干的菌瘪。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼录平,長吁一口氣:“原來是場噩夢啊……” “哼麻车!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起斗这,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤动猬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后表箭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赁咙,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了彼水。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片崔拥。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖凤覆,靈堂內(nèi)的尸體忽然破棺而出链瓦,到底是詐尸還是另有隱情,我是刑警寧澤盯桦,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布慈俯,位于F島的核電站,受9級特大地震影響拥峦,放射性物質(zhì)發(fā)生泄漏贴膘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一略号、第九天 我趴在偏房一處隱蔽的房頂上張望刑峡。 院中可真熱鬧,春花似錦玄柠、人聲如沸突梦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阳似。三九已至,卻和暖如春铐伴,著一層夾襖步出監(jiān)牢的瞬間撮奏,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工当宴, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留畜吊,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓户矢,卻偏偏與公主長得像玲献,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子梯浪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容