Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23819

Testing tgz file for python archives

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Blocker
    • Resolution: Resolved
    • None
    • 1.14.0
    • API / Python

    Description

      This feature is to support tar.gz files as python archives. In the past, it only support zip files as python archives.

      This feature could be tested as following:
      1) Build PyFlink packages from source according to documentation: https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink
      2) Preparing tar.gz file which contains the conda Python virtual environment

      • Install MiniConda in your environment: https://conda.io/projects/conda/en/latest/user-guide/install/macos.html
      • Install conda pack: https://conda.github.io/conda-pack/
      • Prepare the conda environment and install the built PyFlink in the above step into the conda virtual environment:
         conda create --name myenv
         conda activate myenv
         conda install python=3.8
         python -m pip install ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz
         python -m pip install ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
        
      • You could verify the packages installed in the conda env *myenv* as following:
        conda list -n myenv
        
      • Package the conda virtual environment into a tgz file: (it will generate a file named myenv.tar.gz)
         
        conda pack -n myenv
        

      3) Prepare a PyFlink job, here is an example:

      import time
      from pyflink.common.typeinfo import Types
      from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
      from pyflink.table import StreamTableEnvironment, DataTypes, Schema
      
      
      def test_chaining():
          env = StreamExecutionEnvironment.get_execution_environment()
          t_env = StreamTableEnvironment.create(stream_execution_environment=env)
      
          # 1. create source Table
          t_env.execute_sql("""
              CREATE TABLE datagen (
                  id INT,
                  data STRING
              ) WITH (
                  'connector' = 'datagen',
                  'rows-per-second' = '1000000',
                  'fields.id.kind' = 'sequence',
                  'fields.id.start' = '1',
                  'fields.id.end' = '1000'
              )
          """)
      
          # 2. create sink Table
          t_env.execute_sql("""
              CREATE TABLE print (
                  id BIGINT,
                  data STRING,
                  flag STRING
              ) WITH (
                  'connector' = 'blackhole'
              )
          """)
      
          t_env.execute_sql("""
              CREATE TABLE print_2 (
                  id BIGINT,
                  data STRING,
                  flag STRING
              ) WITH (
                  'connector' = 'blackhole'
              )
          """)
      
          # 3. query from source table and perform calculations
          # create a Table from a Table API query:
          source_table = t_env.from_path("datagen")
      
          ds = t_env.to_append_stream(
              source_table,
              Types.ROW([Types.INT(), Types.STRING()]))
      
          ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
          ds2 = ds.map(lambda i: (i[0], i[1][2:]))
      
          class MyCoMapFunction(CoMapFunction):
      
              def map1(self, value):
                  return value
      
              def map2(self, value):
                  return value
      
          ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
      
          ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
                        output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()]))
      
          ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
                   .map(lambda i: i,
                        output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()]))
      
          schema = Schema.new_builder() \
              .column("f0", DataTypes.BIGINT()) \
              .column("f1", DataTypes.STRING()) \
              .column("f2", DataTypes.STRING()) \
              .build()
      
          result_table_3 = t_env.from_data_stream(ds4, schema)
          statement_set = t_env.create_statement_set()
          statement_set.add_insert("print", result_table_3)
      
          result_table_4 = t_env.from_data_stream(ds5, schema)
          statement_set.add_insert("print_2", result_table_4)
      
          statement_set.execute()
      
      
      if __name__ == "__main__":
      
          start_ts = time.time()
          test_chaining()
          end_ts = time.time()
          print("--- %s seconds ---" % (end_ts - start_ts))
         

      4) Submit the PyFlink job using the generated myenv.tar.gz
      ./bin/flink run -d -m localhost:8081 -py test_pyflink.py -pyarch myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python

      5) The job should runs normally and you should see logs as following in the log file of TaskManager:

      2021-08-26 11:14:19,295 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1
      

      It demonstrated that the Python worker was started using the Python interpreter contained in the myenv.tar.gz.

      Attachments

        Activity

          People

            leonard Leonard Xu
            dianfu Dian Fu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: