Status: Closed
Resolution: Resolved
This feature is to support tar.gz files as python archives. In the past, it only support zip files as python archives.
This feature could be tested as following:
1) Build PyFlink packages from source according to documentation:
2) Preparing tar.gz file which contains the conda Python virtual environment
- Install MiniConda in your environment:
- Install conda pack:
- Prepare the conda environment and install the built PyFlink in the above step into the conda virtual environment:
conda create --name myenv conda activate myenv conda install python=3.8 python -m pip install ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz python -m pip install ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
- You could verify the packages installed in the conda env *myenv* as following:
conda list -n myenv
- Package the conda virtual environment into a tgz file: (it will generate a file named myenv.tar.gz)
conda pack -n myenv
3) Prepare a PyFlink job, here is an example:
import time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction from pyflink.table import StreamTableEnvironment, DataTypes, Schema def test_chaining(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # 1. create source Table t_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000000', '' = 'sequence', '' = '1', '' = '1000' ) """) # 2. create sink Table t_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) t_env.execute_sql(""" CREATE TABLE print_2 ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) # 3. query from source table and perform calculations # create a Table from a Table API query: source_table = t_env.from_path("datagen") ds = t_env.to_append_stream( source_table, Types.ROW([Types.INT(), Types.STRING()])) ds1 = i: (i[0] * i[0], i[1])) ds2 = i: (i[0], i[1][2:])) class MyCoMapFunction(CoMapFunction): def map1(self, value): return value def map2(self, value): return value ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) ds4 = i: (i[0], i[1], "left"), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) ds5 = i: (i[0], i[1], "right"))\ .map(lambda i: i, output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) schema = Schema.new_builder() \ .column("f0", DataTypes.BIGINT()) \ .column("f1", DataTypes.STRING()) \ .column("f2", DataTypes.STRING()) \ .build() result_table_3 = t_env.from_data_stream(ds4, schema) statement_set = t_env.create_statement_set() statement_set.add_insert("print", result_table_3) result_table_4 = t_env.from_data_stream(ds5, schema) statement_set.add_insert("print_2", result_table_4) statement_set.execute() if __name__ == "__main__": start_ts = time.time() test_chaining() end_ts = time.time() print("--- %s seconds ---" % (end_ts - start_ts))
4) Submit the PyFlink job using the generated myenv.tar.gz
./bin/flink run -d -m localhost:8081 -py -pyarch myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python
5) The job should runs normally and you should see logs as following in the log file of TaskManager:
2021-08-26 11:14:19,295 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/' for worker id 1-1
It demonstrated that the Python worker was started using the Python interpreter contained in the myenv.tar.gz.