PyFlink集群部署

前言

前面講到flink實時計算用戶畫像的功能,flink需要調(diào)用python寫的算法庫狗超,之前的兩種方案都存在各種各樣的問題,最后決定采用pyflink來開發(fā)。

一門新技術(shù)的引入必定面臨各種坑算芯,期間很多命令都是照著官網(wǎng)寫的,但是還是報錯凳宙。沒辦法熙揍,只能花時間一個一個的解決,本篇就總結(jié)下這兩天部署pyflink過程中遇到的問題氏涩。

部署腳本

run.sh

#!/usr/bin/env bash

unset PYTHONPATH

export PYTHONPATH="/home/work/python3.7.1"
export FLINK_HOME="/home/work/flink-1.15.3"

if [ ! -f realtime_calc_label.zip ];then
    zip -q -r ./realtime_calc_label.zip ./*
fi

# 不加這個alias命令會失效
shopt -s expand_aliases

alias python=/home/work/python3.7.1/bin/python3

/home/work/flink-1.15.3/bin/flink run \
--detached \
-t yarn-per-job \
-Dyarn.application.name=flink_user_profile \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=3096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.queue=t0 \
-Dpython.systemenv.enabled=false \
-p 24 \
-pyarch /home/work/python3.7.1/python3.7.1.zip,./realtime_calc_label.zip \
-pyclientexec ./python3.7.1.zip/bin/python3 \
-pyexec ./python3.7.1.zip/bin/python3 \
-pyfs ./realtime_calc_label.zip \
--python ./label_calc_stream.py \
--jarfile jars/flink-sql-connector-kafka-1.15.2.jar
  • 與run.sh同級目錄下有整個項目的打包文件realtime_calc_label.zip届囚,pyflink程序入口label_calc_stream.py,依賴Java的jar包文件夾jars
  • -pyclientexec是尖、-pyexec采用的都是相對路徑意系,因為flink作業(yè)提交的時候會把需要的資源都拷貝到臨時目錄下
  • 由于yarn集群運行著很多pyspark任務(wù),這次由要運行pyflink任務(wù)饺汹,所以不可避免需要支持多種版本的python環(huán)境蛔添,所以我們最好打包上自己的python環(huán)境和作業(yè)一起提交,這里需要執(zhí)行cd /home/work/python3.7.1 & zip -q -r ./python3.7.1.zip ./* 把python環(huán)境打包
  • 由于PYTHONPATH下python2.x的包跟自己python3.7.1.zip中有個包是沖突的兜辞,目前發(fā)布的flink版本1.15.2對于這種情況的處理存在bug迎瞧,這里需要把flink從1.15.2升級到1.15.3,與flink社區(qū)溝通后得知最快2周之后才能發(fā)布1.15.3版本逸吵,這里手動從github中下載flink的release-1.15 build源碼進行編譯安裝

編譯python

前面講到提交pyflink代碼凶硅,最好攜帶上自己需要的python依賴,為了盡量使包體小一些扫皱,這里自己打包一份純潔版的python環(huán)境足绅,命令如下:

cd /home/work

mkdir python3.9.0
#注意如果在私有云環(huán)境下,不能直接聯(lián)網(wǎng)下載韩脑,可手動上官網(wǎng)下載氢妈,再通過工具上傳
wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz

cd Python-3.9.0

./configure --prefix=/home/work/python3.9.0  --with-ssl

make

make install
  • 下載依賴
# 更新pip版本
/home/work/python3.7.1/bin/python3 -m pip install --upgrade --force pip

/home/work/python3.7.1/bin/python3 -m pip install apache-flink==1.15.2
# 業(yè)務(wù)代碼中用到,讀取配置文件的
/home/work/python3.7.1/bin/python3 -m pip install configparser==5.0.0

/home/work/python3.7.1/bin/python3 -m pip install protobuf
  • 打包依賴
# 在python目錄下執(zhí)行命令打zip包
zip -q -r ./python3.7.1.zip ./*

# 也可以打成gz包
tar -czvf ./python3.7.1.tgz ./*

編譯Flink

  • mac下shell環(huán)境配置扰才。編輯~/.zshrc文件
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home
SCALA_HOME=/Users/david.dong/soft/scala-2.12.13
MAVEN_HOME=/Users/david.dong/soft/apache-maven-3.6.3

export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin:$PATH:.
  • 查看mac當前環(huán)境下所使用的java環(huán)境
# 命令行輸入: /usr/libexec/java_home
david.dong@MacBook-Pro ~ % /usr/libexec/java_home
/Users/david.dong/Library/Java/JavaVirtualMachines/corretto-11.0.14.1/Contents/Home

# 會發(fā)現(xiàn)跟采用:java -version命令看到的不一樣
david.dong@MacBook-Pro ~ % java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
  • 編譯flink源碼
# 下載源碼
git clone https://github.com/apache/flink.git
# 由于已發(fā)布的版本1.15.2有bug允懂,但是在1.15.3會修復(fù),代碼已經(jīng)修改衩匣,所以切換到release-1.15分支
git checkout -b release-1.15
# 在源碼中注釋掉test相關(guān)的module蕾总,然后執(zhí)行編譯命令粥航,編譯后的安裝包存放在flink/flink-dist/target/目錄下
mvn clean install -DskipTests -Drat.skip=true

PyFlink代碼

  • 項目結(jié)構(gòu)


    project structure
  • 主邏輯

import json
import sys
from pyflink.common import Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from udf_lib.calc_label_func import CalcLabelFunc

sys.path.append(".")


def label_calc(profile):
    env = StreamExecutionEnvironment.get_execution_environment()
    # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
    # env.add_jars(
    #     "file:///Users/dengpengfei/PycharmProjects/realtime_calc_label/src/jars/flink-sql-connector-kafka-1.15.2.jar")

    job_conf = {'env': profile}
    env.get_config().set_global_job_parameters(job_conf)

    kafka_consumer = FlinkKafkaConsumer(
        topics='change_user_preview',
        deserialization_schema=SimpleStringSchema(),
        properties={'bootstrap.servers': '127.0.0.1:9092',
                    'group.id': 'offline',
                    'auto.offset.reset': 'earliest'})

    source_ds = env.add_source(kafka_consumer).name('source_kafka')

    map_df = source_ds.map(lambda row: json.loads(row))

    process_df = map_df.key_by(lambda row: row['unique_id']).flat_map(CalcLabelFunc(), output_type=Types.STRING()).name('flat_map_calc')

    kafka_producer = FlinkKafkaProducer(
        topic='change_label_preview',
        serialization_schema=SimpleStringSchema(),
        producer_config={'bootstrap.servers': '127.0.0.1:9092'})

    process_df.add_sink(kafka_producer).name('sink_kafka')

    env.execute('label_calc')


if __name__ == '__main__':
    run_env = 'preview'
    if len(sys.argv) > 1:
        run_env = sys.argv[1]
    label_calc(run_env)
  • udf函數(shù)
import json
import sys
import time
from pyflink.datastream import RuntimeContext, FlatMapFunction
from pymongo import MongoClient
from calc_lib.online_calc import OnlineCalc
from db_utils.mysql_util import get_sql_conn, get_dict_data_sql

sys.path.append(".")


class CalcLabelFunc(FlatMapFunction):

    def __init__(self):
        self.env = None
        self.mongo = None
        self.mysql_conf = None
        self.online_calc = None
        self.cur_time_stamp = None

    def open(self, runtime_context: RuntimeContext):

        self.mongo = MongoClient('mongodb://localhost:27017')
        self.mysql_conf = {
            'host': '127.0.0.1',
            'username': 'root',
            'password': '123456',
            'db': 'user_profile'
        }
        self.cur_time_stamp = 0

    def flat_map(self, value):
        # update conf
        if time.time() - self.cur_time_stamp > 60 * 3:
            self.cur_time_stamp = time.time()
            self.update_conf()

        unique_id = value['unique_id']
        entity_type = value['entity_type']
        version = value['version']

        pp_db = 'pp_{}_{}_{}'.format(entity_type, version, self.env)
        pp_doc = self.mongo[pp_db]['pp_table'].find_one({'unique_id': unique_id})

        profile_db = 'profile_{}_{}_{}'.format(entity_type, version, self.env)
        profile_doc = self.mongo[profile_db]['profile_table'].find_one({'unique_id': unique_id})

        if pp_doc:
            if not profile_doc:
                profile_doc = {}
            profile_new = self.online_calc.calc(pp_doc, profile_doc)
            self.mongo[profile_db]['profile_table'].replace_one({'unique_id': unique_id}, profile_new, True)
            profile_new['entity_type'] = entity_type
            return [json.dumps(profile_new)]
        else:
            return []

    def close(self):
        self.mongo.close()

    def update_conf(self):
        con, cursor = get_sql_conn(self.mysql_conf)
        strategy_data = get_dict_data_sql(cursor, 'SELECT * FROM calc_strategy')
        relation_data = get_dict_data_sql(cursor, 'SELECT * FROM table_relation')
        con.close()

        self.online_calc = OnlineCalc(strategy_data, relation_data)

        print('update config ...')
  • mysql 工具類
import pymysql


def get_sql_conn(conf):
    """
    獲取數(shù)據(jù)庫連接
    """
    conn = pymysql.connect(host=conf['host'], user=conf['username'], password=conf['password'], db=conf['db'])
    cursor = conn.cursor()
    return conn, cursor


def get_index_dict(cursor):
    """
    獲取數(shù)據(jù)庫對應(yīng)表中的字段名
    """
    index_dict = dict()
    index = 0
    for desc in cursor.description:
        index_dict[desc[0]] = index
        index = index + 1
    return index_dict


def get_dict_data_sql(cursor, sql):
    """
    運行sql語句,獲取結(jié)果生百,并根據(jù)表中字段名递雀,轉(zhuǎn)化成dict格式(默認是tuple格式)
    """
    cursor.execute(sql)
    data = cursor.fetchall()
    index_dict = get_index_dict(cursor)
    res = []
    for datai in data:
        resi = dict()
        for indexi in index_dict:
            resi[indexi] = datai[index_dict[indexi]]
        res.append(resi)
    return res

結(jié)

本次部署主要遇到的問題是使用的flink的新版本,提交命令發(fā)生了變化蚀浆。然后也發(fā)現(xiàn)了一個flink1.15.2的bug缀程,解決方式是升級版本,添加配置 python.systemenv.enabled=false

https://issues.apache.org/jira/browse/FLINK-29479

https://github.com/apache/flink/pull/21110

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末市俊,一起剝皮案震驚了整個濱河市杨凑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌摆昧,老刑警劉巖撩满,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異绅你,居然都是意外死亡伺帘,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門忌锯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來伪嫁,“玉大人,你說我怎么就攤上這事偶垮≌趴龋” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵似舵,是天一觀的道長晶伦。 經(jīng)常有香客問我,道長啄枕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任族沃,我火速辦了婚禮频祝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脆淹。我一直安慰自己常空,他們只是感情好,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布盖溺。 她就那樣靜靜地躺著漓糙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪烘嘱。 梳的紋絲不亂的頭發(fā)上昆禽,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天蝗蛙,我揣著相機與錄音,去河邊找鬼醉鳖。 笑死捡硅,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的盗棵。 我是一名探鬼主播壮韭,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼纹因!你這毒婦竟也來了喷屋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤瞭恰,失蹤者是張志新(化名)和其女友劉穎屯曹,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寄疏,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡是牢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了陕截。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驳棱。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖农曲,靈堂內(nèi)的尸體忽然破棺而出社搅,到底是詐尸還是另有隱情,我是刑警寧澤乳规,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布形葬,位于F島的核電站,受9級特大地震影響暮的,放射性物質(zhì)發(fā)生泄漏笙以。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一冻辩、第九天 我趴在偏房一處隱蔽的房頂上張望猖腕。 院中可真熱鬧,春花似錦恨闪、人聲如沸倘感。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽老玛。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蜡豹,已是汗流浹背麸粮。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留余素,地道東北人豹休。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像桨吊,于是被迫代替她去往敵國和親威根。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容