Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17093

Python UDF doesn't work when the input column is from composite field

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      For the following job:

      
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings, CsvTableSink
      from pyflink.table.descriptors import Schema, Kafka, Json
      from pyflink.table import DataTypes
      from pyflink.table.udf import ScalarFunction, udf
      import os
      
      @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
       DataTypes.STRING()],
       result_type=DataTypes.STRING())
      def get_host_ip(source, qr, sip, dip):
          if source == "NGAF" and qr == '1':
              return dip
          return sip
      
      @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
       DataTypes.STRING()],
       result_type=DataTypes.STRING())
      def get_dns_server_ip(source, qr, sip, dip):
          if source == "NGAF" and qr == '1':
              return sip
          return dip
      
      def test_case():
          env = StreamExecutionEnvironment.get_execution_environment()
          env.set_parallelism(1)
          t_env = StreamTableEnvironment.create(env)
      
           from pyflink.table import Row
         table = t_env.from_elements(
            [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", qr="qr", queries="queries", answers="answers", qtypes="qtypes", atypes="atypes", rcode="rcode", ts="ts",))],
          DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
       DataTypes.FIELD("data",
       DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
       DataTypes.FIELD("devid", DataTypes.STRING()),
       DataTypes.FIELD('sip', DataTypes.STRING()),
       DataTypes.FIELD('dip', DataTypes.STRING()),
       DataTypes.FIELD("qr", DataTypes.STRING()),
       DataTypes.FIELD("queries", DataTypes.STRING()),
       DataTypes.FIELD("answers", DataTypes.STRING()),
       DataTypes.FIELD("qtypes", DataTypes.STRING()),
       DataTypes.FIELD("atypes", DataTypes.STRING()),
       DataTypes.FIELD("rcode", DataTypes.STRING()),
       DataTypes.FIELD("ts", DataTypes.STRING())]))
       ]
       ))
      
       result_file = "/tmp/test.csv"
       if os.path.exists(result_file):
       os.remove(result_file)
      
       t_env.register_table_sink("Results",
       CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'],
       [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
       DataTypes.STRING(),
       DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
       DataTypes.STRING(), DataTypes.STRING(),
       DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
       "/tmp/test.csv"))
      
       t_env.register_function("get_host_ip", get_host_ip)
       t_env.register_function("get_dns_server_ip", get_dns_server_ip)
      
       t_env.register_table("source", table)
       standard_table = t_env.sql_query("select data.*, stype as dns_type from source")\
       .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
       t_env.register_table("standard_table", standard_table)
      
       final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as host_ip,"
       "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table")
      
       final_table.insert_into("Results")
      
       t_env.execute("test")
      
      
      if __name__ == '__main__':
       test_case()
      

      The plan is as following which is not correct:

       org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            dian.fu Dian Fu
            dian.fu Dian Fu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m

                Slack

                  Issue deployment