實(shí)時(shí)處理代碼開發(fā)
開發(fā)flink代碼导俘,實(shí)現(xiàn)統(tǒng)計(jì)socket當(dāng)中的單詞數(shù)量
第一步:創(chuàng)建maven工程呀伙,導(dǎo)入jar包
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
第二步:開發(fā)flink代碼統(tǒng)計(jì)socket當(dāng)中的單詞數(shù)量
開發(fā)flink代碼實(shí)現(xiàn)接受socket單詞數(shù)據(jù)蠢古,然后對數(shù)據(jù)進(jìn)行統(tǒng)計(jì)
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
case class CountWord(word:String,count:Long)
object FlinkCount {
def main(args: Array[String]): Unit = {
//獲取程序入口類
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//從socket當(dāng)中獲取數(shù)據(jù)
val result: DataStream[String] = environment.socketTextStream("node01",9000)
//導(dǎo)入隱式轉(zhuǎn)換的包整吆,否則時(shí)間不能使用
import org.apache.flink.api.scala._
//將數(shù)據(jù)進(jìn)行切割牢裳,封裝到樣例類當(dāng)中着饥,然后進(jìn)行統(tǒng)計(jì)
val resultValue: DataStream[CountWord] = result
.flatMap(x => x.split(" "))
.map(x => CountWord(x,1))
.keyBy("word")
// .timeWindow(Time.seconds(1),Time.milliseconds(1)) 按照每秒鐘時(shí)間窗口参咙,以及每秒鐘滑動間隔來進(jìn)行數(shù)據(jù)統(tǒng)計(jì)
.sum("count")
//打印最終輸出結(jié)果
resultValue.print().setParallelism(1)
//啟動服務(wù)
environment.execute()
}
}
第三步:打包上傳到服務(wù)器運(yùn)行
將我們的程序打包龄广,然后上傳到服務(wù)器進(jìn)行運(yùn)行,將我們打包好的程序上傳到node01服務(wù)器蕴侧,然后體驗(yàn)在各種模式下進(jìn)行運(yùn)行我們的程序
1择同、standAlone模式運(yùn)行程序
第一步:啟動flink集群
node01執(zhí)行以下命令啟動flink集群
cd /kkb/install/flink-1.8.1
bin/start-cluster.sh
第二步:啟動node01的socket服務(wù),并提交flink任務(wù)
node01執(zhí)行以下命令啟動node01的socket服務(wù)
nc -lk 9000
提交任務(wù)
將我們打包好的jar包上傳到node01服務(wù)器的/kkb路徑下净宵,然后提交任務(wù)敲才,注意,在pom.xml當(dāng)中需要添加我們的打包插件择葡,然后將任務(wù)代碼進(jìn)行打包紧武,且集群已有的代碼需要將打包scope設(shè)置為provided,在pom.xml將我們關(guān)于flink的jar包scope設(shè)置為provided
打包敏储,并將我們的jar-with-dependencies的jar包上傳到node01服務(wù)器的/kkb路徑下
node01執(zhí)行以下命令提交任務(wù)
cd /kkb/install/flink-1.8.1/
bin/flink run --class com.kkb.flink.demo1.FlinkCount /kkb/flink_day01-1.0-SNAPSHOT-jar-with-dependencies.jar
第三步:查詢運(yùn)行結(jié)果
node01查看運(yùn)行結(jié)果
cd /kkb/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-1-node01.kaikeba.com.out
注意:結(jié)果保存在以.out結(jié)尾的文件當(dāng)中阻星,哪個(gè)文件當(dāng)中有數(shù)據(jù),就查看哪個(gè)文件即可
離線批量處理代碼開發(fā)
flink也可以通過批量處理代碼來實(shí)現(xiàn)批量數(shù)據(jù)處理
需求:處理附件中的count.txt文件已添,實(shí)現(xiàn)單詞計(jì)數(shù)統(tǒng)計(jì)
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}
object BatchOperate {
def main(args: Array[String]): Unit = {
val inputPath = "D:\\count.txt"
val outPut = "D:\\data\\result2"
//獲取程序入口類ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)
//引入隱式轉(zhuǎn)換
import org.apache.flink.api.scala._
val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:\\datas\\result.txt").setParallelism(1)
env.execute("batch word count")
}
}