Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Invalid
-
None
-
None
-
None
Description
We have a udaf which has some overloaded methods and it can work in flink 1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw an exception as follows:
Caused by: org.apache.flink.table.api.ValidationException: Considering all hints, the method should comply with the signature: accumulate(_, java.lang.String, java.lang.Object, java.lang.Object) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535)
In flink 1.16, the method
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction); does noe exist anymore, and I want to how to rewrite the udaf to make it works.
The test code is as follows:
TableEnvironment tableEnvironment = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment()); // tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new MultiAggToJsonArrayV2()); tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` AS 'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'"); tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a` VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " + "('connector'='datagen')\n"); tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a` VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" ); tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` SELECT `a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM `grocery_udf_test` GROUP BY `a`");