Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.18.0, 1.17.2
-
Python 3.7.16 or Python 3.9
YARN
Description
PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same program works fine on Flink 1.17. This is after the change (https://issues.apache.org/jira/browse/FLINK-32034).
Repro:
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16
[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64
[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output s3://prabhuflinks3/OUT2/
Error
ModuleNotFoundError: No module named 'cloudpickle'
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
at org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
Analysis
1. On Flink 1.17 and Python-3.7.16, PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two paths
[root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp /tmp/lib/python3.7/site-packages /tmp/lib64/python3.7/site-packages
whereas Flink 1.18 (FLINK-32034) has changed the PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned
[root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp /tmp/lib64/python3.7/site-packages [root@ip-172-31-45-97 tmp]#
The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" which is not returned by the getSitePackagesPath in Flink1.18 causing the pyflink job failure.
Attached batch_wc.py, flink1.17-get_site_packages.py and flink1.18-get_site_packages.py.
Attachments
Attachments
Issue Links
- links to