Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33529

PyFlink fails with "No module named 'cloudpickle"

    XMLWordPrintableJSON

Details

    Description

      PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same program works fine on Flink 1.17. This is after the change (https://issues.apache.org/jira/browse/FLINK-32034).

      Repro:

      [hadoop@ip-1-2-3-4 ~]$ python --version
      Python 3.7.16
      
      [hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
      flink-1.18.0-1.amzn2.x86_64
      
      [hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
      
      [hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output s3://prabhuflinks3/OUT2/
      

      Error

      ModuleNotFoundError: No module named 'cloudpickle'
      
      	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
      	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
      	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
      	at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
      	at org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      

      Analysis

      1. On Flink 1.17 and Python-3.7.16, PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two paths

      [root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
      /tmp/lib/python3.7/site-packages
      /tmp/lib64/python3.7/site-packages
      

      whereas Flink 1.18 (FLINK-32034) has changed the PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned

      [root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
      /tmp/lib64/python3.7/site-packages
      [root@ip-172-31-45-97 tmp]#
      

      The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" which is not returned by the getSitePackagesPath in Flink1.18 causing the pyflink job failure.

      Attached batch_wc.py, flink1.17-get_site_packages.py and flink1.18-get_site_packages.py.

      Attachments

        1. batch_wc.py
          7 kB
          Prabhu Joseph
        2. flink1.17-get_site_packages.py
          0.4 kB
          Prabhu Joseph
        3. flink1.18-get_site_packages.py
          0.1 kB
          Prabhu Joseph

        Issue Links

          Activity

            People

              prabhujoseph Prabhu Joseph
              prabhujoseph Prabhu Joseph
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: