Flink on yarn運(yùn)行python程序
Flink本地local模式運(yùn)行python程序
參考官網(wǎng)寫了python程序Adder.py(后面貼上代碼)观话,在本地local模式用./bin/pyflink.sh ./Adder.p啟動(dòng)是可以的
? flink-1.7.1 ./bin/pyflink.sh ./Adder.py
Starting execution of program
Program execution finished
Job with JobID 8c7ab33a382c279b66089d43693fde52 has finished.
Job Runtime: 679 ms
jobManager web ui上查看是有JobID 為8c7ab33a382c279b66089d43693fde52的任務(wù)的。
可以看到本地local模式是可以運(yùn)行python程序的拼余。
Flink on yarn運(yùn)行python程序
在服務(wù)器6.34上執(zhí)行(6.34上有單機(jī)的flink客戶端闸拿,可以連接yarn空盼,向yarn提交flink程序)
iknow@search-uda-6-34:~/wangbin/flink-1.7.2 $ ./bin/pyflink.sh ./Adder.py
默認(rèn)會(huì)起動(dòng)到application_1550579025929_61820上,指定了yid參數(shù)也會(huì)默認(rèn)跑在application_1550579025929_61820上
決定換一臺(tái)機(jī)器試試新荤,到0-107上揽趾,這里省略拷貝hadoop和flink環(huán)境的過程,
啟動(dòng)一個(gè)yarn session:
./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
啟動(dòng)日志如下:
iknow@search-uda-0-107:~/wangbin/flink/flink-1.7.2 $ ./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
2019-03-01 19:53:31,148 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-01 19:53:31,150 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-03-01 19:53:31,152 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-01 19:53:31,153 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-03-01 19:53:31,815 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-01 19:53:31,978 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS; Ignoring.
2019-03-01 19:53:31,980 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir; Ignoring.
2019-03-01 19:53:31,981 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir; Ignoring.
2019-03-01 19:53:31,986 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to iknow (auth:SIMPLE)
2019-03-01 19:53:32,451 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at data-hadoop-112-16.bjrs.zybang.com/192.168.112.16:10200
2019-03-01 19:53:32,479 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument n is deprecated in will be ignored.
2019-03-01 19:53:32,542 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Looking for the active RM in [rm1, rm2]...
2019-03-01 19:53:32,662 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Found active RM [rm1]
2019-03-01 19:53:32,720 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2019-03-01 19:53:32,720 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2096, numberTaskManagers=2, slotsPerTaskManager=1}
2019-03-01 19:53:33,265 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-01 19:53:33,274 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/iknow/wangbin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-01 19:53:42,592 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1550579025929_89782
2019-03-01 19:53:42,849 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1550579025929_89782
2019-03-01 19:53:42,849 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-03-01 19:53:42,853 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-03-01 19:53:50,718 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-03-01 19:53:51,320 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on search-as-107-45.bjcq.zybang.com:42915 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://search-as-107-45.bjcq.zybang.com:42915
yarn上是有這個(gè)yarn session的
點(diǎn)擊ApplicationMaster進(jìn)去JobManager的web ui苛骨,啟動(dòng)一個(gè)python程序
./bin/pyflink.sh Adder.py程序起動(dòng)到了ID為application_1550579025929_89782 的APPID上
JobID為5612d8a9b68a72633a7e2138df4537ba篱瞎。
再啟動(dòng)一個(gè)yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency -n 4 -jm 1024m -tm 2096m
在yarn上看到有ID為application_1550579025929_89810的yarn session
再用./bin/pyflink.sh腳本運(yùn)行python程序
查看JobManager 的web ui上看到是有899b20e2cde7b4ef29afc5b90e56325d的Job ID的。
也就是./bin/pyflink.sh腳本會(huì)將python程序運(yùn)行在最新啟動(dòng)的yarn session上痒芝。
再啟動(dòng)一個(gè)yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency-2 -n 2 -jm 2048m -tm 4096m
Yarn上看到有ID為application_1550579025929_89846的yarn session
./bin/pyflink.sh腳本運(yùn)行python腳本
到JobManager上看到有jobID為9d7d9d58ee886fd74a7ec56ff487b80d的任務(wù)俐筋。
Adder.py
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute()
flink官網(wǎng)python的Datasource和DataSink
從flink官網(wǎng)看到python的api支持的數(shù)據(jù)源(Data Source)如下:
-
基于文件的:
read_text(path):按行讀取文件并將其作為字符串返回。
read_csv(path, type):解析逗號(hào)(或其他字符)分隔字段的文件严衬。返回元組的DataSet澄者。支持基本java類型及其Value對應(yīng)作為字段類型。
-
基于集合:
from_elements(*args):從Seq創(chuàng)建數(shù)據(jù)集请琳。所有數(shù)據(jù)元
generate_sequence(from, to):并行生成給定間隔中的數(shù)字序列粱挡。
支持的Data Sink
write_text():按字符串順序?qū)懭霐?shù)據(jù)元。通過調(diào)用每個(gè)數(shù)據(jù)元的str()方法獲得字符串单起。
write_csv(...):將元組寫為逗號(hào)分隔值文件抱怔。行和字段分隔符是可配置的。每個(gè)字段的值來自對象的str()方法嘀倒。
output():打印標(biāo)準(zhǔn)輸出上每個(gè)數(shù)據(jù)元的str()值屈留。
總結(jié)
Flink on yarn運(yùn)行python程序的步驟:
- 開啟一個(gè)yarn session
- 用 ./bin/pyflink.sh腳本運(yùn)行python程序,程序會(huì)跑在最新啟動(dòng)的yarn session上测蘑。
遇到的問題灌危,如果啟動(dòng)多個(gè)yarn session,無法通過指定yid來運(yùn)行到不同yarn session上碳胳。
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/python.html