Details
-
New Feature
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:
val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val sqlQuery = "SELECT b, " + " SUM(DISTINCT (a / 3)), " + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + " COUNT(DISTINCT c) " + "FROM MyTable " + "GROUP BY b" val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 2L, "Hello world")) data.+=((4, 3L, "Hello world, how are you?")) data.+=((5, 3L, "I am fine.")) data.+=((6, 3L, "Luke Skywalker")) data.+=((7, 4L, "Comment#1")) data.+=((8, 4L, "Comment#2")) data.+=((9, 4L, "Comment#3")) data.+=((10, 4L, "Comment#4")) data.+=((11, 5L, "Comment#5")) data.+=((12, 5L, "Comment#6")) data.+=((13, 5L, "Comment#7")) data.+=((14, 5L, "Comment#8")) data.+=((15, 5L, "Comment#9")) data.+=((16, 6L, "Comment#10")) data.+=((17, 6L, "Comment#11")) data.+=((18, 6L, "Comment#12")) data.+=((19, 6L, "Comment#13")) data.+=((20, 6L, "Comment#14")) data.+=((21, 6L, "Comment#15")) val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
Fails with:
org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM If you think this function should be supported, you can create an issue and start a discussion for it. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) at scala.Option.getOrElse(Option.scala:121) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170) at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165) at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476) at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141) at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50) at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
Attachments
Issue Links
- relates to
-
FLINK-10846 Add support for IS DISTINCT FROM in code generator
-
- Resolved
-
-
FLINK-10847 Add support for IS NOT DISTINCT FROM in code generator
-
- Resolved
-
- links to