Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34985

It doesn't support to access fields by name for map function in thread mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • API / Python
    • None

    Description

      Reported in slack channel: https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589

      ```
      hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the Row seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs:
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.common import Row
      from pyflink.table import StreamTableEnvironment, DataTypes
      from pyflink.table.udf import udf

      env = StreamExecutionEnvironment.get_execution_environment()
      t_env = StreamTableEnvironment.create(env)
      t_env.get_config().set("parallelism.default", "1")

      1. This does work:
        t_env.get_config().set("python.execution-mode", "process")
      1. This doesn't work:
        #t_env.get_config().set("python.execution-mode", "thread")

      def map_function(a: Row) -> Row:
      return Row(a.a + 1, a.b * a.b)

      1. map operation with a python general scalar function
        func = udf(
        map_function,
        result_type=DataTypes.ROW(
        [
        DataTypes.FIELD("a", DataTypes.BIGINT()),
        DataTypes.FIELD("b", DataTypes.BIGINT()),
        ]
        ),
        )
        table = (
        t_env.from_elements(
        [(2, 4), (0, 0)],
        schema=DataTypes.ROW(
        [
        DataTypes.FIELD("a", DataTypes.BIGINT()),
        DataTypes.FIELD("b", DataTypes.BIGINT()),
        ]
        ),
        )
        .map(func)
        .alias("a", "b")
        .execute()
        .print()
        )```
         
        The exception I get in this execution mode is:
        2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a'
        2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
        2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
        2024-03-28 16:32:10 at <string>.<lambda>(<string>:1)
        2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)

      Attachments

        Activity

          People

            Unassigned Unassigned
            dianfu Dian Fu
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: