Flink on yarn運(yùn)行python程序

Flink on yarn運(yùn)行python程序

Flink本地local模式運(yùn)行python程序

參考官網(wǎng)寫了python程序Adder.py(后面貼上代碼)观话,在本地local模式用./bin/pyflink.sh ./Adder.p啟動(dòng)是可以的

?  flink-1.7.1 ./bin/pyflink.sh ./Adder.py
Starting execution of program
Program execution finished
Job with JobID 8c7ab33a382c279b66089d43693fde52 has finished.
Job Runtime: 679 ms

jobManager web ui上查看是有JobID 為8c7ab33a382c279b66089d43693fde52的任務(wù)的。


可以看到本地local模式是可以運(yùn)行python程序的拼余。

Flink on yarn運(yùn)行python程序

在服務(wù)器6.34上執(zhí)行(6.34上有單機(jī)的flink客戶端闸拿,可以連接yarn空盼,向yarn提交flink程序)
iknow@search-uda-6-34:~/wangbin/flink-1.7.2 $ ./bin/pyflink.sh ./Adder.py

默認(rèn)會(huì)起動(dòng)到application_1550579025929_61820上,指定了yid參數(shù)也會(huì)默認(rèn)跑在application_1550579025929_61820上

決定換一臺(tái)機(jī)器試試新荤,到0-107上揽趾,這里省略拷貝hadoop和flink環(huán)境的過程,
啟動(dòng)一個(gè)yarn session:

./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m

啟動(dòng)日志如下:

iknow@search-uda-0-107:~/wangbin/flink/flink-1.7.2 $ ./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
2019-03-01 19:53:31,148 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-01 19:53:31,150 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-03-01 19:53:31,152 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-03-01 19:53:31,153 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-03-01 19:53:31,815 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-01 19:53:31,978 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-03-01 19:53:31,980 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-03-01 19:53:31,981 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-03-01 19:53:31,986 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to iknow (auth:SIMPLE)
2019-03-01 19:53:32,451 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at data-hadoop-112-16.bjrs.zybang.com/192.168.112.16:10200
2019-03-01 19:53:32,479 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument n is deprecated in will be ignored.
2019-03-01 19:53:32,542 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-03-01 19:53:32,662 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-03-01 19:53:32,720 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2019-03-01 19:53:32,720 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2096, numberTaskManagers=2, slotsPerTaskManager=1}
2019-03-01 19:53:33,265 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-01 19:53:33,274 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/wangbin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-01 19:53:42,592 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-01 19:53:42,853 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-01 19:53:50,718 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-03-01 19:53:51,320 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on search-as-107-45.bjcq.zybang.com:42915 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://search-as-107-45.bjcq.zybang.com:42915

yarn上是有這個(gè)yarn session的


點(diǎn)擊ApplicationMaster進(jìn)去JobManager的web ui苛骨,啟動(dòng)一個(gè)python程序
./bin/pyflink.sh Adder.py程序起動(dòng)到了ID為application_1550579025929_89782 的APPID上


JobID為5612d8a9b68a72633a7e2138df4537ba篱瞎。

再啟動(dòng)一個(gè)yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency -n 4 -jm 1024m -tm 2096m


在yarn上看到有ID為application_1550579025929_89810的yarn session


再用./bin/pyflink.sh腳本運(yùn)行python程序


查看JobManager 的web ui上看到是有899b20e2cde7b4ef29afc5b90e56325d的Job ID的。


也就是./bin/pyflink.sh腳本會(huì)將python程序運(yùn)行在最新啟動(dòng)的yarn session上痒芝。

再啟動(dòng)一個(gè)yarn session

./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency-2 -n 2 -jm 2048m -tm 4096m

Yarn上看到有ID為application_1550579025929_89846的yarn session


./bin/pyflink.sh腳本運(yùn)行python腳本


到JobManager上看到有jobID為9d7d9d58ee886fd74a7ec56ff487b80d的任務(wù)俐筋。


Adder.py

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator])
    collector.collect((count, word))

env = get_environment()
data = env.from_elements("Who's there?",
 "I think I hear them. Stand, ho! Who's there?")

data \
  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  .group_by(1) \
  .reduce_group(Adder(), combinable=True) \
  .output()

env.execute()

flink官網(wǎng)python的Datasource和DataSink

從flink官網(wǎng)看到python的api支持的數(shù)據(jù)源(Data Source)如下:

  • 基于文件的:

    • read_text(path):按行讀取文件并將其作為字符串返回。

    • read_csv(path, type):解析逗號(hào)(或其他字符)分隔字段的文件严衬。返回元組的DataSet澄者。支持基本java類型及其Value對應(yīng)作為字段類型。

  • 基于集合:

    • from_elements(*args):從Seq創(chuàng)建數(shù)據(jù)集请琳。所有數(shù)據(jù)元

    • generate_sequence(from, to):并行生成給定間隔中的數(shù)字序列粱挡。

支持的Data Sink

  • write_text():按字符串順序?qū)懭霐?shù)據(jù)元。通過調(diào)用每個(gè)數(shù)據(jù)元的str()方法獲得字符串单起。

  • write_csv(...):將元組寫為逗號(hào)分隔值文件抱怔。行和字段分隔符是可配置的。每個(gè)字段的值來自對象的str()方法嘀倒。

  • output():打印標(biāo)準(zhǔn)輸出上每個(gè)數(shù)據(jù)元的str()值屈留。

總結(jié)

Flink on yarn運(yùn)行python程序的步驟:

    1. 開啟一個(gè)yarn session
    1. 用 ./bin/pyflink.sh腳本運(yùn)行python程序,程序會(huì)跑在最新啟動(dòng)的yarn session上测蘑。
  • 遇到的問題灌危,如果啟動(dòng)多個(gè)yarn session,無法通過指定yid來運(yùn)行到不同yarn session上碳胳。

參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/python.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末勇蝙,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子挨约,更是在濱河造成了極大的恐慌味混,老刑警劉巖产雹,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異翁锡,居然都是意外死亡蔓挖,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門馆衔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瘟判,“玉大人,你說我怎么就攤上這事角溃】交瘢” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵减细,是天一觀的道長匆瓜。 經(jīng)常有香客問我,道長邪财,這世上最難降的妖魔是什么陕壹? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮树埠,結(jié)果婚禮上糠馆,老公的妹妹穿的比我還像新娘。我一直安慰自己怎憋,他們只是感情好又碌,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著绊袋,像睡著了一般毕匀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上癌别,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天皂岔,我揣著相機(jī)與錄音,去河邊找鬼展姐。 笑死躁垛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的圾笨。 我是一名探鬼主播教馆,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼擂达!你這毒婦竟也來了土铺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎悲敷,沒想到半個(gè)月后究恤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡后德,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年丁溅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片探遵。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖妓柜,靈堂內(nèi)的尸體忽然破棺而出箱季,到底是詐尸還是另有隱情,我是刑警寧澤棍掐,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布藏雏,位于F島的核電站,受9級(jí)特大地震影響作煌,放射性物質(zhì)發(fā)生泄漏掘殴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一粟誓、第九天 我趴在偏房一處隱蔽的房頂上張望奏寨。 院中可真熱鬧,春花似錦鹰服、人聲如沸病瞳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽套菜。三九已至,卻和暖如春设易,著一層夾襖步出監(jiān)牢的瞬間逗柴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國打工顿肺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留戏溺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓挟冠,卻偏偏與公主長得像于购,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子知染,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Flink on yarn部署模式 背景 Flink是一個(gè)高性能肋僧,高吞吐,低延遲的流處理框架。它不僅僅是作為一個(gè)流...
    it_zzy閱讀 72,987評(píng)論 5 31
  • 1嫌吠、Standalone 軟件要求: Java 1.8.x or higher ssh JAVA_HOME配置 Y...
    JACKbayue閱讀 17,397評(píng)論 0 5
  • 1.下載Flink壓縮包 下載地址:http://flink.apache.org/downloads.html止潘。...
    尼小摩閱讀 62,965評(píng)論 2 22
  • 今天去看了前任,現(xiàn)在還在電影的劇情里辫诅。走心凭戴,更扎心。一路上跟胡老師影評(píng)炕矮,都覺得劇情不是太怎么樣么夫,但是像極了人...
    輕瑜伽閱讀 232評(píng)論 0 0
  • 地上的枯葉在抓蜻蜓的影子,風(fēng)停了的時(shí)候肤视,自己又會(huì)是在哪里档痪?枯葉會(huì)不會(huì)也是因?yàn)檫@樣想,被風(fēng)牽過思緒帶走邢滑,隨風(fēng)走了腐螟。風(fēng)...
    4e38ebe82a21閱讀 191評(píng)論 0 1