Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30849

udaf validated failed with TableEnvironment#executeSql but work correctly with StreamTableEnrivorment#registerFunction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Invalid
    • None
    • None
    • API / Core, Table SQL / API
    • None

    Description

      We have a udaf which has some overloaded methods and it can work in flink 1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw an exception as follows:

      Caused by: org.apache.flink.table.api.ValidationException: Considering all hints, the method should comply with the signature:
      accumulate(_, java.lang.String, java.lang.Object, java.lang.Object)
          at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
          at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
          at org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535) 

      In flink 1.16, the method
      <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction); does noe exist anymore, and I want to how to rewrite the udaf to make it works.
      The test code is as follows: 

      TableEnvironment tableEnvironment = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
      //        tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new MultiAggToJsonArrayV2());
              tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` AS 'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'");
              tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a` VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " +
                      "('connector'='datagen')\n");
              tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a` VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" );
              tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` SELECT `a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM `grocery_udf_test` GROUP BY `a`"); 

      MultiAggToJsonArrayV2.java

      Attachments

        1. MultiAggToJsonArrayV2.java
          36 kB
          Jianhui Dong

        Activity

          People

            Unassigned Unassigned
            lam167 Jianhui Dong
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: