這篇文章以flink官方local cluster教程為主線写烤,引導(dǎo)大家體驗(yàn)一下flink的初次開發(fā)。文章中所提到的代碼我已經(jīng)放到github上肄程,歡迎指正选浑。
下載和啟動(dòng)Flink
Flink可以運(yùn)行在Linux、Mac和Windows上拓提,唯一的要求就是必須安裝Java 8或以上版本隧膘∷卤梗可以使用一下命令查看:
(py2.7) bogon:flink-examples yss$ java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
java安裝后去官網(wǎng)下載Flink蹦疑,然后解壓即可運(yùn)行。這里以flink-1.7.0為例艇肴。
cd ~/Downloads/
tar xzf flink-1.7.0-bin-scala_2.12.tgz
cd flink-1.7.0
以本地模式啟動(dòng)Flink
在flink目錄下運(yùn)行以下命令即可啟動(dòng)
./bin/start-cluster.sh # Start Flink
這時(shí)候可以打開瀏覽器訪問http://localhost:8081
]flink的監(jiān)控頁(yè)面叁温。
我們通過(guò)jps可以看到多出來(lái)兩個(gè)JVM進(jìn)程,運(yùn)行的主類StandaloneSessionClusterEntrypoint和TaskManagerRunner冲九。
[root@henghe-121 bin]# jps -l
16016 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
16513 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
16619 sun.tools.jps.Jps
集群?jiǎn)?dòng)后就可以開發(fā)我們的flink程序了~
寫第一個(gè)Flink WorkCount程序
官網(wǎng)給出的Scala例子如下:
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
代碼本身不難,不過(guò)建立scala的Maven工程泳炉,打成可執(zhí)行jar還需要很多設(shè)置。在這里我將官方的例子配置做了很多簡(jiǎn)化氧腰,對(duì)于初學(xué)者更友好刨肃,并將代碼上傳到了github上。大家clone之后黄痪,直接運(yùn)行mvn clean package就可以生成jar包盔然。
運(yùn)行WordCount
對(duì)于這個(gè)例子首先要用netcat創(chuàng)建服務(wù),監(jiān)聽某個(gè)端口挺尾。
nc -l 9000
之后就可以提交job任務(wù)了
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program
然后再nc端輸入要處理的字符串站绪,flink就可以拿到數(shù)據(jù)進(jìn)行處理
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
該任務(wù)的輸出在flink的log下以.out結(jié)尾的文件下。
$ tail -f flink-yss-taskexecutor-0-bogon.out
WordWithCount(hello,1)
WordWithCount(sdsd,1)
WordWithCount(sdsd,1)
WordWithCount(sss,1)
最后測(cè)試完可以關(guān)閉集群:
./bin/stop-cluster.sh