在此記錄 pyflink 運(yùn)行過程中遇到的問題以及解決方法驹溃。讓小伙伴們少走彎路疾棵。
Q1: No module named 'encodings'
Caused by: java.io.IOException: Failed to execute the command: venv.zip/venv/bin/python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Could not find platform independent libraries <prefix>
Could not find platform dependent libraries <exec_prefix>
Consider setting $PYTHONHOME to <prefix>[:<exec_prefix>]
Fatal Python error: initfsencoding: unable to load the file system codec
ModuleNotFoundError: No module named 'encodings'
問題分析:Python 環(huán)境的問題理郑,用 virtualenv 來管理虛擬環(huán)境時(shí)遇到坡疼。
問題解決:利用以下腳本(注意修改 pyflink 版本)控轿,使用 miniconda3 來管理虛擬環(huán)境即可解決缠诅。
wget https://ci.apache.org/projects/flink/flink-docs-release-1.11/downloads/setup-pyflink-virtual-env.sh
sh setup-pyflink-virtual-env.sh 1.11.2
source venv/bin/activate # 激活虛擬環(huán)境
Q2: No module named 'encodings'
File "main.py", line 2
SyntaxError: Non-ASCII character '\xe5' in file main.py on line 3, but no encoding declared; see http://www.python.org/peps/pep-0263.html for details
org.apache.flink.client.program.ProgramAbortException
問題分析:表面上是無法解析非 ASCII 碼哑蔫,實(shí)際上是因?yàn)楫?dāng)前的 python 版本錯(cuò)了钉寝。通過 flink run 來提交 Python 任務(wù)時(shí),F(xiàn)link 會(huì)調(diào)用 “python” 命令闸迷,要求 python 的版本為 3.5, 3.6 或者 3.7 中的一個(gè)嵌纲。
問題解決:激活虛擬環(huán)境,使得運(yùn)行 python -V
時(shí)顯示的 python 版本為 3.5腥沽,3.6 或 3.7
Q3: Could not find any factory for identifier 'kafka'
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.org.apache.flink.client.program.ProgramAbortException
問題分析:沒有指定 kafka jar 文件
問題解決:在 flink run 的時(shí)候逮走,加入?yún)?shù) -j flink-sql-connector-kafka_2.11-1.11.2.jar
,具體 jar 包的下載路徑見 Apache Kafka SQL Connector今阳,根據(jù) kafka 版本選擇 jar 包來下載师溅。
Q4: The parallelism must be a positive number: yarch
The parallelism must be a positive number: yarch
問題分析:并行度要設(shè)置為正數(shù)
問題解決:在 flink run 的時(shí)候,加入?yún)?shù) -p 1
盾舌。但問題可能出在 flink 上墓臭。直接部署的 flink 沒有問題,但是集成到 CDH 后妖谴,flink 的配置參數(shù)發(fā)生了變化窿锉,導(dǎo)致無法以簡單的 flink run 的方式來運(yùn)行
Q5: No manifest found in jar file '/xxxx/venv.zip'
org.apache.flink.client.program.ProgramInvocationException: No manifest found in jar file '/xxxx/venv.zip'. The manifest is need to point to the program's main class.
問題分析:沒有找到 manifest 文件來指定程序的 main class。
問題解決:同 Q4,可能是 flink 沒有正確安裝好榆综。
Q6: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
問題分析:jar 包里沒有指定 'Main-Class' 或 'program-class'
問題解決:同 Q4妙痹,可能是 flink 沒有正確安裝好。
Q7:java.net.MalformedURLException: no protocol:
py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.
: java.net.MalformedURLException: no protocol:
問題分析:no protocol鼻疮,沒有指定通信協(xié)議異常怯伊。
問題解決:看看報(bào)錯(cuò)所在的行,如果是 URL 判沟,檢查是否缺少 http:// 耿芹;如果是路徑,檢查是否缺少 file://挪哄。
Q8:Method registerFunction(...) does not exist
py4j.protocol.Py4JError: An error occurred while calling o4.registerFunction. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method registerFunction([class java.lang.String, class com.flink.udf.TopN]) does not exist
問題分析:registerFunction 函數(shù)不存在吧秕,看源碼 registerFunction 是 TableEnvironment._j_env 對象的方法,懷疑是 _j_env 沒有正確定義迹炼。_j_env 應(yīng)該是指 java 運(yùn)行的環(huán)境砸彬。
問題解決:在創(chuàng)建 TableEnvironment 的時(shí)候,再傳入環(huán)境變量斯入。下面舉例說明砂碉。
原來的創(chuàng)建方式:
# 流處理環(huán)境
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
改正后的創(chuàng)建方式,注意 create 函數(shù)里的第一個(gè)入?yún)榍懊娉跏蓟玫?env 變量:
# 流處理環(huán)境
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
對于批處理同理:
from pyflink.dataset import ExecutionEnvironment
env = ExecutionEnvironment.get_execution_environment()
...
Q9:Caused by: org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
Caused by: org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:404)
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216)
問題分析:TaskExecutor 沒有在運(yùn)行狀態(tài)刻两≡霾洌可以運(yùn)行 jps 查看是否有 TaskManagerRunner 。
問題解決:重啟 Flink磅摹。
Q10:Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: xxxx/site-packages/pyflink/fn_execution/boot.py --id=1-1 --logging_endpoint=localhost:58957 --artifact_endpoint=localhost:58958 --provision_endpoint=localhost:58959 --control_endpoint=localhost:58956
...
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Server sendMessage() failed with Error"
debug_error_string = "{"created":"@1605690803.729786000","description":"Error received from peer ipv6:[::1]:58959","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Server sendMessage() failed with Error","grpc_status":1}"
>
...
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
...
Caused by: java.lang.IllegalStateException: Process died with exit code 0
...
問題分析:發(fā)送消息給 RPC 服務(wù)失敗滋迈,具體原因不知道。
問題解決:重新提交 Flink 作業(yè)户誓。
Q11:Process died with exit code 127
問題背景:我想要在流處理作業(yè)的 UDF 里使用某個(gè)依賴項(xiàng)(如 faker )饼灿,于是編寫了 requirements.txt 并且使用如下的 pip install 命令下載該依賴項(xiàng)的安裝包到 cached_dir 目錄下,如上圖所示帝美,包含了 4 個(gè)文件碍彭。完成后,以 yarn-cluster 模式提交作業(yè)到集群上证舟,但是出現(xiàn)了下面的問題硕旗,看作業(yè)的運(yùn)行狀態(tài)是 finished (流處理作業(yè)不應(yīng)該會(huì) finished),最終狀態(tài)是 failed女责。
pip download -d cached_dir -r requirements.txt --no-binary :all:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
java.lang.RuntimeException: Failed to create stage bundle factory!
...
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 127
...
Caused by: java.lang.IllegalStateException: Process died with exit code 127
...
問題分析:通過控制變量法漆枚,定位到問題在于 python 依賴包〉种可以看到在 cached_dir 里面有一個(gè) python 原生自帶的包 python-deteutil 墙基,可能是在安裝這個(gè)包的時(shí)候發(fā)生了沖突(猜測)软族。
問題解決:把 python-deteutil-2.8.1.tar.gz 刪除后就沒有問題了。最終 cached_dir 只包含了 3 個(gè)必備的安裝包残制。