前言
前面講到flink實時計算用戶畫像的功能,flink需要調(diào)用python寫的算法庫狗超,之前的兩種方案都存在各種各樣的問題,最后決定采用pyflink來開發(fā)。
一門新技術(shù)的引入必定面臨各種坑算芯,期間很多命令都是照著官網(wǎng)寫的,但是還是報錯凳宙。沒辦法熙揍,只能花時間一個一個的解決,本篇就總結(jié)下這兩天部署pyflink過程中遇到的問題氏涩。
部署腳本
run.sh
#!/usr/bin/env bash
unset PYTHONPATH
export PYTHONPATH="/home/work/python3.7.1"
export FLINK_HOME="/home/work/flink-1.15.3"
if [ ! -f realtime_calc_label.zip ];then
zip -q -r ./realtime_calc_label.zip ./*
fi
# 不加這個alias命令會失效
shopt -s expand_aliases
alias python=/home/work/python3.7.1/bin/python3
/home/work/flink-1.15.3/bin/flink run \
--detached \
-t yarn-per-job \
-Dyarn.application.name=flink_user_profile \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=3096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.queue=t0 \
-Dpython.systemenv.enabled=false \
-p 24 \
-pyarch /home/work/python3.7.1/python3.7.1.zip,./realtime_calc_label.zip \
-pyclientexec ./python3.7.1.zip/bin/python3 \
-pyexec ./python3.7.1.zip/bin/python3 \
-pyfs ./realtime_calc_label.zip \
--python ./label_calc_stream.py \
--jarfile jars/flink-sql-connector-kafka-1.15.2.jar
- 與run.sh同級目錄下有整個項目的打包文件realtime_calc_label.zip届囚,pyflink程序入口label_calc_stream.py,依賴Java的jar包文件夾jars
- -pyclientexec是尖、-pyexec采用的都是相對路徑意系,因為flink作業(yè)提交的時候會把需要的資源都拷貝到臨時目錄下
- 由于yarn集群運行著很多pyspark任務(wù),這次由要運行pyflink任務(wù)饺汹,所以不可避免需要支持多種版本的python環(huán)境蛔添,所以我們最好打包上自己的python環(huán)境和作業(yè)一起提交,這里需要執(zhí)行cd /home/work/python3.7.1 & zip -q -r ./python3.7.1.zip ./* 把python環(huán)境打包
- 由于PYTHONPATH下python2.x的包跟自己python3.7.1.zip中有個包是沖突的兜辞,目前發(fā)布的flink版本1.15.2對于這種情況的處理存在bug迎瞧,這里需要把flink從1.15.2升級到1.15.3,與flink社區(qū)溝通后得知最快2周之后才能發(fā)布1.15.3版本逸吵,這里手動從github中下載flink的release-1.15 build源碼進行編譯安裝
編譯python
前面講到提交pyflink代碼凶硅,最好攜帶上自己需要的python依賴,為了盡量使包體小一些扫皱,這里自己打包一份純潔版的python環(huán)境足绅,命令如下:
cd /home/work
mkdir python3.9.0
#注意如果在私有云環(huán)境下,不能直接聯(lián)網(wǎng)下載韩脑,可手動上官網(wǎng)下載氢妈,再通過工具上傳
wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz
cd Python-3.9.0
./configure --prefix=/home/work/python3.9.0 --with-ssl
make
make install
- 下載依賴
# 更新pip版本
/home/work/python3.7.1/bin/python3 -m pip install --upgrade --force pip
/home/work/python3.7.1/bin/python3 -m pip install apache-flink==1.15.2
# 業(yè)務(wù)代碼中用到,讀取配置文件的
/home/work/python3.7.1/bin/python3 -m pip install configparser==5.0.0
/home/work/python3.7.1/bin/python3 -m pip install protobuf
- 打包依賴
# 在python目錄下執(zhí)行命令打zip包
zip -q -r ./python3.7.1.zip ./*
# 也可以打成gz包
tar -czvf ./python3.7.1.tgz ./*
編譯Flink
- mac下shell環(huán)境配置扰才。編輯~/.zshrc文件
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home
SCALA_HOME=/Users/david.dong/soft/scala-2.12.13
MAVEN_HOME=/Users/david.dong/soft/apache-maven-3.6.3
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$MAVEN_HOME/bin:$PATH:.
- 查看mac當前環(huán)境下所使用的java環(huán)境
# 命令行輸入: /usr/libexec/java_home
david.dong@MacBook-Pro ~ % /usr/libexec/java_home
/Users/david.dong/Library/Java/JavaVirtualMachines/corretto-11.0.14.1/Contents/Home
# 會發(fā)現(xiàn)跟采用:java -version命令看到的不一樣
david.dong@MacBook-Pro ~ % java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
- 編譯flink源碼
# 下載源碼
git clone https://github.com/apache/flink.git
# 由于已發(fā)布的版本1.15.2有bug允懂,但是在1.15.3會修復(fù),代碼已經(jīng)修改衩匣,所以切換到release-1.15分支
git checkout -b release-1.15
# 在源碼中注釋掉test相關(guān)的module蕾总,然后執(zhí)行編譯命令粥航,編譯后的安裝包存放在flink/flink-dist/target/目錄下
mvn clean install -DskipTests -Drat.skip=true
PyFlink代碼
-
項目結(jié)構(gòu)
主邏輯
import json
import sys
from pyflink.common import Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from udf_lib.calc_label_func import CalcLabelFunc
sys.path.append(".")
def label_calc(profile):
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
# env.add_jars(
# "file:///Users/dengpengfei/PycharmProjects/realtime_calc_label/src/jars/flink-sql-connector-kafka-1.15.2.jar")
job_conf = {'env': profile}
env.get_config().set_global_job_parameters(job_conf)
kafka_consumer = FlinkKafkaConsumer(
topics='change_user_preview',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'offline',
'auto.offset.reset': 'earliest'})
source_ds = env.add_source(kafka_consumer).name('source_kafka')
map_df = source_ds.map(lambda row: json.loads(row))
process_df = map_df.key_by(lambda row: row['unique_id']).flat_map(CalcLabelFunc(), output_type=Types.STRING()).name('flat_map_calc')
kafka_producer = FlinkKafkaProducer(
topic='change_label_preview',
serialization_schema=SimpleStringSchema(),
producer_config={'bootstrap.servers': '127.0.0.1:9092'})
process_df.add_sink(kafka_producer).name('sink_kafka')
env.execute('label_calc')
if __name__ == '__main__':
run_env = 'preview'
if len(sys.argv) > 1:
run_env = sys.argv[1]
label_calc(run_env)
- udf函數(shù)
import json
import sys
import time
from pyflink.datastream import RuntimeContext, FlatMapFunction
from pymongo import MongoClient
from calc_lib.online_calc import OnlineCalc
from db_utils.mysql_util import get_sql_conn, get_dict_data_sql
sys.path.append(".")
class CalcLabelFunc(FlatMapFunction):
def __init__(self):
self.env = None
self.mongo = None
self.mysql_conf = None
self.online_calc = None
self.cur_time_stamp = None
def open(self, runtime_context: RuntimeContext):
self.mongo = MongoClient('mongodb://localhost:27017')
self.mysql_conf = {
'host': '127.0.0.1',
'username': 'root',
'password': '123456',
'db': 'user_profile'
}
self.cur_time_stamp = 0
def flat_map(self, value):
# update conf
if time.time() - self.cur_time_stamp > 60 * 3:
self.cur_time_stamp = time.time()
self.update_conf()
unique_id = value['unique_id']
entity_type = value['entity_type']
version = value['version']
pp_db = 'pp_{}_{}_{}'.format(entity_type, version, self.env)
pp_doc = self.mongo[pp_db]['pp_table'].find_one({'unique_id': unique_id})
profile_db = 'profile_{}_{}_{}'.format(entity_type, version, self.env)
profile_doc = self.mongo[profile_db]['profile_table'].find_one({'unique_id': unique_id})
if pp_doc:
if not profile_doc:
profile_doc = {}
profile_new = self.online_calc.calc(pp_doc, profile_doc)
self.mongo[profile_db]['profile_table'].replace_one({'unique_id': unique_id}, profile_new, True)
profile_new['entity_type'] = entity_type
return [json.dumps(profile_new)]
else:
return []
def close(self):
self.mongo.close()
def update_conf(self):
con, cursor = get_sql_conn(self.mysql_conf)
strategy_data = get_dict_data_sql(cursor, 'SELECT * FROM calc_strategy')
relation_data = get_dict_data_sql(cursor, 'SELECT * FROM table_relation')
con.close()
self.online_calc = OnlineCalc(strategy_data, relation_data)
print('update config ...')
- mysql 工具類
import pymysql
def get_sql_conn(conf):
"""
獲取數(shù)據(jù)庫連接
"""
conn = pymysql.connect(host=conf['host'], user=conf['username'], password=conf['password'], db=conf['db'])
cursor = conn.cursor()
return conn, cursor
def get_index_dict(cursor):
"""
獲取數(shù)據(jù)庫對應(yīng)表中的字段名
"""
index_dict = dict()
index = 0
for desc in cursor.description:
index_dict[desc[0]] = index
index = index + 1
return index_dict
def get_dict_data_sql(cursor, sql):
"""
運行sql語句,獲取結(jié)果生百,并根據(jù)表中字段名递雀,轉(zhuǎn)化成dict格式(默認是tuple格式)
"""
cursor.execute(sql)
data = cursor.fetchall()
index_dict = get_index_dict(cursor)
res = []
for datai in data:
resi = dict()
for indexi in index_dict:
resi[indexi] = datai[index_dict[indexi]]
res.append(resi)
return res
結(jié)
本次部署主要遇到的問題是使用的flink的新版本,提交命令發(fā)生了變化蚀浆。然后也發(fā)現(xiàn)了一個flink1.15.2的bug缀程,解決方式是升級版本,添加配置 python.systemenv.enabled=false