Flink作為一個(gè)分布式流式計(jì)算引擎讨韭,需要計(jì)算資源才可以執(zhí)行應(yīng)用程序结闸。Flink能夠與目前所有通用的資源管理框架集成嫁蛇,比如Hadoop YARN课幕、Apache Mesos和Kubernetes厦坛。除了運(yùn)行在現(xiàn)有資源管理器上五垮,F(xiàn)link還能夠獨(dú)立部署運(yùn)行(stand-alone cluster),也就是自身提供資源管理杜秸。
Standalone集群部署
Flink運(yùn)行在類Unix操作系統(tǒng)之上放仗,所以你可以部署在Linux、Mac OS X和Cygwin上亩歹。
環(huán)境準(zhǔn)備
- Java1.8.x及以上版本匙监。
- ssh安裝
Java需要配置JAVA_HOME環(huán)境變量,并且指向Java安裝目錄小作。
ssh需要在集群節(jié)點(diǎn)上配置免密碼登錄亭姥,并且Flink安裝目錄也要在所有節(jié)點(diǎn)的相同目錄上,這樣我們就可以使用Flink的腳本來(lái)管理它們了顾稀。
Flink集群由一個(gè)Master節(jié)點(diǎn)和一個(gè)或多個(gè)Worker節(jié)點(diǎn)組成达罗。
主機(jī) | 節(jié)點(diǎn)類型 | 服務(wù)名稱 |
---|---|---|
10.0.0.1 | master | jobmanager |
10.0.0.2 | worker | taskmanager |
10.0.0.3 | worker | taskmanager |
下載安裝
使用Flink并不是必須要提前安裝Hadoop,如果我們不打算使用Hadoop組件(比如HDFS静秆、YARN粮揉、HBase等),我們就可以下載沒(méi)有將Hadoop預(yù)編譯到Flink的二進(jìn)制安裝包上抚笔。
Flink下載頁(yè)面:https://flink.apache.org/downloads.html
tar xzf flink-*.tgz
cd flink-*
如果需要與Hadoop集成使用扶认,需要選擇預(yù)編譯好的Hadoop版本,或者自己編譯Flink源碼來(lái)指定Hadoop版本殊橙。
配置Flink
我們下載解壓縮Flink之后辐宾,就需要對(duì)Flink進(jìn)行配置了。Flink配置文件在${FLINK_HOME}/conf/
目錄中膨蛮,我們主要的配置文件為flink-conf.yaml
叠纹。
在flink-config.yaml配置文件添加以下基礎(chǔ)配置項(xiàng):
- jobmanager.rpc.address:指定master節(jié)點(diǎn)。
- jobmanager.heap.mb:為master節(jié)點(diǎn)JVM配置最大內(nèi)存敞葛。
- taskmanager.heap.mb:為worder節(jié)點(diǎn)JVM配置最大內(nèi)存誉察。
vim conf/flink-conf.yaml
jobmanager.rpc.address: 10.0.0.1
jobmanager.heap.mb: 2048m
taskmanager.heap.mb: 1024m
如果每個(gè)worker節(jié)點(diǎn)的內(nèi)存不同,想要為某些特定worker節(jié)點(diǎn)多指定一些內(nèi)存惹谐,則可以在特定節(jié)點(diǎn)上使用環(huán)境變量
FLINK_TM_HEAP
來(lái)覆蓋指定持偏。
配置完flink-conf.yaml后,還需要在conf/slaves文件中給出所有worker節(jié)點(diǎn)豺鼻。
vim conf/slaves
10.0.0.2
10.0.0.3
這里給出的是Flink的最簡(jiǎn)單集群配置综液,更多配置信息可以查看:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html。
節(jié)點(diǎn)分發(fā)
配置好后儒飒,我們需要將Flink目錄文件分發(fā)到集群中的所有其它節(jié)點(diǎn)上谬莹。
scp -r flink-1.7.2 root@10.0.0.2:/opt/yjz/flink/flink-1.7.2/
scp -r flink-1.7.2 root@10.0.0.3:/opt/yjz/flink/flink-1.7.2/
特別注意:所有節(jié)點(diǎn)的Flink所在目錄必須一致。
服務(wù)啟動(dòng)
節(jié)點(diǎn)分發(fā)后我們通過(guò)start-cluster.sh
腳本在Master節(jié)點(diǎn)上來(lái)啟動(dòng)Flink集群。由于我們?cè)趕laves文件里面指定了所有worker節(jié)點(diǎn)附帽,所以master節(jié)點(diǎn)會(huì)通過(guò)ssh免密碼登錄來(lái)啟動(dòng)所有worker節(jié)點(diǎn)埠戳。
bin/start-cluster.sh
啟動(dòng)成功后我們可以通過(guò)UI界面來(lái)查看:http://10.0.0.1:8081
停止服務(wù)
停止服務(wù)直接使用bin/stop-cluster.sh
腳本即可。
高可用部署
Flink集群的JobManager同時(shí)負(fù)責(zé)任務(wù)調(diào)度和資源管理(在Standalone模式下)蕉扮,可想而知它的壓力是很大的整胃。在默認(rèn)情況下Flink集群只會(huì)啟動(dòng)一個(gè)JobManager,這樣就會(huì)存在單點(diǎn)故障(single point of failure, SPOF)喳钟。
因此屁使,一般在生產(chǎn)環(huán)境中我們需要為Standalone模式或YARN模式提供Flink的高可用部署方式(High Availability, HA)。
這里我們給出Standalone部署模式的HA部署方案奔则,關(guān)于YARN的可以放到Flink on YARN部署文章中蛮寂。
Flink高可用部署原理
Flink的HA部署方案和業(yè)界的其它大數(shù)據(jù)處理框架HA基本一致:?jiǎn)?dòng)過(guò)程中啟動(dòng)多個(gè)JobManager,然后通過(guò)Zookeeper來(lái)選舉出一個(gè)leader易茬,其它JobManager作為備用主節(jié)點(diǎn)(standby)酬蹋。
下圖是描述在JobManager啟動(dòng)時(shí)首先選舉一個(gè)Leader,當(dāng)Leader節(jié)點(diǎn)掛掉后抽莱,從standby leader中再重新選舉出一個(gè)新的Leader節(jié)點(diǎn)范抓。
HA配置
Flink的JobManager使用Zookeeper作為分布式協(xié)調(diào)服務(wù),所以我們需要獨(dú)立部署一套Zookeeper食铐,然后在Flink上添加一些必要的配置匕垫。
配置masters文件
我們需要將所有JobManager節(jié)點(diǎn)配置到conf/masters文件中,配置內(nèi)容除了主機(jī)地址外虐呻,還需要配置webUI的端口年缎。
jobManagerAddress1: webUIPort1
[...]
jobManagerAddressN: webUIportN
默認(rèn)JobManager的通信端口是隨機(jī)啟動(dòng)的,我們也可以為Flink指定特定的通信端口铃慷,但是范圍只能是:50010,50011,50020-50025,50050-50075。(該配置在conf/flink-conf.yaml中)
high-availability.jobmanager.port: 50010
配置flink-conf.yaml文件
HA其余配置都在conf/flink-conf.yaml中蜕该,以下是基本配置:
#必要配置犁柜,通過(guò)Zookeeper啟動(dòng)HA模式
high-availability: zookeeper
#必要配置,指定Zookeeper連接地址堂淡,可以配置多個(gè)broker馋缅,防止Zk單節(jié)點(diǎn)掛掉
high-availability.zookeeper.quorum: 10.0.0.1:2181,10.0.0.2:2181
#建議配置,存儲(chǔ)JobManager集群節(jié)點(diǎn)的根目錄
high-availability.zookeeper.path.root: /flink
#建議配置绢淀,存儲(chǔ)協(xié)調(diào)數(shù)據(jù)的目錄
high-availability.cluster-id: /defaul_ns
#必須配置萤悴,JobManager中持久化元數(shù)據(jù)的存儲(chǔ)目錄(Zookeeper只存儲(chǔ)了狀態(tài)信息)
high-availability.storageDir: hdfs:///flink/recovery
啟動(dòng)集群
在啟動(dòng)Flink之前,要確保Zookeeper集群已經(jīng)啟動(dòng)并運(yùn)行皆的。
bin/start-cluster.sh
Flink自帶Zookeeper集群
對(duì)于測(cè)試集群覆履,如果我們沒(méi)有Zookeeper實(shí)例集群,可以使用Flink自帶的Zookeeper集群。在conf/zoo.cfg下有Zookeeper的配置模板硝全。我們可以指定運(yùn)行Zookeeper服務(wù)的節(jié)點(diǎn):
server.1= address1:peerPort1:leaderPort1
[...]
server.X= addressX:peerPort:leaderPort
然后使用Flink自帶的腳本啟動(dòng)Zookeeper集群:bin/start-zookeeper-quorum.sh
栖雾。
對(duì)于生產(chǎn)環(huán)境,建議使用自己的獨(dú)立Zookeeper集群伟众。
單機(jī)部署
我們?cè)趯W(xué)習(xí)或測(cè)試的時(shí)候可以使用本地單機(jī)Flink模式,單機(jī)Flink部署方式非常簡(jiǎn)單,我們只需要將Flink二進(jìn)制安裝包下載下來(lái)后句灌,直接使用bin/flink-start.sh
來(lái)啟動(dòng)即可悟耘。
默認(rèn)conf/flink-conf.yaml中的jobmanager.rpc.address配置項(xiàng)為localhost,slaves文件中也為localhost先紫,所以會(huì)在本地即啟動(dòng)jobmanager又啟動(dòng)taskmanager治泥。
Flink CLI
Flink提供了命令行接口(Command-Line Interface,CLI)來(lái)運(yùn)行作業(yè)jar包和控制作業(yè)執(zhí)行。CLI位于bin/flink
目錄下泡孩。
Flink CLI格式如下:
./flink <ACTION> [OPTIONS] [ARGUMENTS]
ACTION包含了一下幾類:
ACTION | 使用方式 | 說(shuō)明 |
---|---|---|
run | ./flink run [OPTIONS] <jar-file> <arguments> | 編譯并運(yùn)行作業(yè) |
info | ./flink info [OPTIONS] <jar-file> <arguments> | 以JSON的形式顯示程序的優(yōu)化執(zhí)行計(jì)劃 |
list | ./flink list [OPTION] | 列出正在執(zhí)行和調(diào)度(scheduled)的任務(wù) |
stop | ./flink stop [OPTION] <Job ID> | 停止運(yùn)行的程序车摄,注意使用stop只能停止流處理作業(yè)。 |
cancel | ./flink cancel [OPTION] <Job ID> | 取消運(yùn)行的程序 |
savepoint | ./flink savepoint [OPTIONS] <Job ID> [<target directory>] | 為運(yùn)行中的作業(yè)觸發(fā)保存點(diǎn) |
modify | ./flink modify <Job ID> [OPTIONS] | 修改運(yùn)行中的程序 |
具體OPTIONS參數(shù)我們可以通過(guò)./flink
命令詳細(xì)查看使用仑鸥。
stop和cancel雖然都是停止作業(yè)吮播,但是兩者實(shí)現(xiàn)是不一樣的。使用cancel方法作業(yè)中的operator會(huì)立即接收到停止命令眼俊,來(lái)取消任務(wù)意狠。如果operator沒(méi)有取消任務(wù),F(xiàn)link開(kāi)始定期中斷線程疮胖,直到它停止环戈。而stop是以一種更優(yōu)雅的方式來(lái)停止作業(yè),使用Stop停止作業(yè)澎灸,任務(wù)數(shù)據(jù)源需要實(shí)現(xiàn)
StoppableFunction
接口院塞,這樣當(dāng)收到stop命令時(shí),數(shù)據(jù)源首先停止發(fā)送數(shù)據(jù)性昭,然后等待集群中的作業(yè)執(zhí)行完成拦止,最后正常停止作業(yè)。