Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.12.5, 1.13.3
Description
The failed example:
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()]) def StoTraceMqSourcePlugUDTF(s: str): import json try: data = json.loads(s) except Exception as e: return None source_code = "trace" try: shipment_no = data['shipMentNo'] except Exception as e: return None yield source_code, shipment_no class StoTraceFindNameUDTF(TableFunction): def eval(self, shipment_no): yield shipment_no, shipment_no sto_trace_find_name = udtf(StoTraceFindNameUDTF(), result_types=[DataTypes.STRING(), DataTypes.STRING()]) # self.env.set_parallelism(1) self.t_env.create_temporary_system_function( "StoTraceMqSourcePlugUDTF", StoTraceMqSourcePlugUDTF) self.t_env.create_temporary_system_function( "sto_trace_find_name", sto_trace_find_name ) source_table = self.t_env.from_elements([( '{"shipMentNo":"84210186879"}',)], ['biz_context']) # self.t_env.execute_sql(source_table) self.t_env.register_table("source_table", source_table) t = self.t_env.sql_query( "SELECT biz_context, source_code, shipment_no FROM source_table LEFT JOIN LATERAL TABLE(StoTraceMqSourcePlugUDTF(biz_context)) as T(source_code, shipment_no)" " ON TRUE") self.t_env.register_table("Table2", t) t = self.t_env.sql_query( "SELECT source_code, shipment_no, shipment_name, shipment_type FROM Table2 LEFT JOIN LATERAL TABLE(sto_trace_find_name(shipment_no)) as T(shipment_name, shipment_type)" " ON TRUE" ) print(t.to_pandas())
In the failed example, the input arguments of the second Python Table Function has the wrong positions mapping.
Attachments
Issue Links
- links to