Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Reported in slack channel: https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589
```
hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the Row seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
- This does work:
t_env.get_config().set("python.execution-mode", "process")
- This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")
def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)
- map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)```
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a'
2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at <string>.<lambda>(<string>:1)
2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)