1、環(huán)境準備
首先役纹,準備 python 虛擬環(huán)境。2020年11月3日時的 pyflink 的最高版本為 1.11.2,請開發(fā)者按照實際需要或者線上環(huán)境要求來指定 pyflink 版本涵妥。
wget https://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.sh
sh setup-pyflink-virtual-env.sh 1.11.2
source venv/bin/activate # 激活虛擬環(huán)境
setup-pyflink-virtual-env.sh
腳本會自動安裝 miniconda 并在當前目錄下創(chuàng)建虛擬環(huán)境文件夾 venv,然后自動安裝 apache-flink 模塊后壓縮虛擬環(huán)境為 venv.zip坡锡。
2蓬网、腳本開發(fā)
Pyflink 腳本開發(fā)請認真參考 官方文檔v1.11窒所。
Pyflink 正在快速發(fā)展的階段,每次版本更新都會增加很多新的特性帆锋,同時會取消舊特性吵取,因此務必確保開發(fā)時所參考的文檔與本地 pyflink 版本一致。
3窟坐、Jar 包依賴
Flink 中的 Jar 包是 connector 的擴展海渊,允許在 flink 腳本中連接和使用各種數(shù)據(jù)存儲工具,包括:
- 文件系統(tǒng)哲鸳,如 HDFS臣疑,S3
- 數(shù)據(jù)庫,如 MySQL徙菠,MongDB
- 消息隊列讯沈,如 Kafka
Pyflink 默認支持有限的幾種 jar 包,如有特殊需要(例如以 json 格式來消費 kafka 里的數(shù)據(jù))婿奔,需要手動指定腳本依賴的 jar 包所在的路徑缺狠。
已知有 3 種方式來指定 jar 包依賴。
3.1萍摊、方法1:在 pyflink 腳本中指定
在腳本中完成 TableEnvironment 的初始化后挤茄,添加下面的腳本以指定 jar 包路徑(多個 jar 包的路徑用 ; 隔開)。
table_env.get_config().get_configuration().set_string("pipeline.jars", 'xxxx.jar;xxxxx.jar')
注意冰木,本地環(huán)境的 jar 包路徑與線上環(huán)境的 jar 包路徑可能不同穷劈,因此每次提交到線上時還需要修改腳本中的路徑為對應的路徑。
3.2踊沸、方法2:在 pyflink 模塊的 lib 目錄下歇终。
找到 pyflink 模塊的安裝路徑,以及對應的 lib 目錄逼龟。
import pyflink
import os
print(os.path.join(os.path.dirname(pyflink.__file__), 'lib'))
然后使用 cp 命令復制 jar 包到 lib 目錄下即可评凝。
這種方法一次運行,一勞永逸腺律,比較適合本地開發(fā)奕短。
3.3、方法3:flink run -j
這種方式不適用于本地開發(fā)匀钧,而是用于提交到集群上時指定 jar 包的路徑篡诽,但為了較為系統(tǒng)地介紹 jar 包依賴的指定方式,故在此介紹榴捡。
命令如下:
flink run -m yarn-cluster \
-j flink-sql-connector-kafka_2.11-1.11.2.jar \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python3 \
-py main.py
通過 -j 參數(shù)來指定一個 jar 包路徑杈女,多個 jar 包則使用多個 -j 。
4、本地測試
Flink 支持使用 local-singleJVM 模式 來進行本地測試达椰,即只需簡單的執(zhí)行 Python xxxx.py
命令翰蠢,pyflink 就會默認啟動一個 local-singleJVM 的 flink 環(huán)境來執(zhí)行作業(yè)。
在運行過程中啰劲,可以另起終端梁沧,輸入 jps 來查看 java 進程。