Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24860

Fix the wrong position mappings in the Python UDTF

    XMLWordPrintableJSON

Details

    Description

      The failed example:

              @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
              def StoTraceMqSourcePlugUDTF(s: str):
                  import json
                  try:
                      data = json.loads(s)
                  except Exception as e:
                      return None
                  source_code = "trace"
                  try:
                      shipment_no = data['shipMentNo']
                  except Exception as e:
                      return None
                  yield source_code, shipment_no
      
              class StoTraceFindNameUDTF(TableFunction):
                  def eval(self, shipment_no):
                      yield shipment_no, shipment_no
      
              sto_trace_find_name = udtf(StoTraceFindNameUDTF(),
                                         result_types=[DataTypes.STRING(), DataTypes.STRING()])
      
              # self.env.set_parallelism(1)
              self.t_env.create_temporary_system_function(
                  "StoTraceMqSourcePlugUDTF", StoTraceMqSourcePlugUDTF)
              self.t_env.create_temporary_system_function(
                  "sto_trace_find_name", sto_trace_find_name
              )
              source_table = self.t_env.from_elements([(
                  '{"shipMentNo":"84210186879"}',)],
                  ['biz_context'])
              # self.t_env.execute_sql(source_table)
              self.t_env.register_table("source_table", source_table)
      
              t = self.t_env.sql_query(
                  "SELECT biz_context, source_code, shipment_no FROM source_table LEFT JOIN LATERAL TABLE(StoTraceMqSourcePlugUDTF(biz_context)) as T(source_code, shipment_no)"
                  " ON TRUE")
              self.t_env.register_table("Table2", t)
              t = self.t_env.sql_query(
                  "SELECT source_code, shipment_no, shipment_name, shipment_type FROM Table2 LEFT JOIN LATERAL TABLE(sto_trace_find_name(shipment_no)) as T(shipment_name, shipment_type)"
                  " ON TRUE"
              )
              print(t.to_pandas())
      

      In the failed example, the input arguments of the second Python Table Function has the wrong positions mapping.

      Attachments

        Issue Links

          Activity

            People

              hxbks2ks Huang Xingbo
              hxbks2ks Huang Xingbo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: