Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10845

Support DISTINCT aggregates for batch

    XMLWordPrintableJSON

Details

    Description

      Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:

          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env)
      
          val sqlQuery =
            "SELECT b, " +
            "  SUM(DISTINCT (a / 3)), " +
            "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
            "  COUNT(DISTINCT c) " +
            "FROM MyTable " +
            "GROUP BY b"
      
          val data = new mutable.MutableList[(Int, Long, String)]
          data.+=((1, 1L, "Hi"))
          data.+=((2, 2L, "Hello"))
          data.+=((3, 2L, "Hello world"))
          data.+=((4, 3L, "Hello world, how are you?"))
          data.+=((5, 3L, "I am fine."))
          data.+=((6, 3L, "Luke Skywalker"))
          data.+=((7, 4L, "Comment#1"))
          data.+=((8, 4L, "Comment#2"))
          data.+=((9, 4L, "Comment#3"))
          data.+=((10, 4L, "Comment#4"))
          data.+=((11, 5L, "Comment#5"))
          data.+=((12, 5L, "Comment#6"))
          data.+=((13, 5L, "Comment#7"))
          data.+=((14, 5L, "Comment#8"))
          data.+=((15, 5L, "Comment#9"))
          data.+=((16, 6L, "Comment#10"))
          data.+=((17, 6L, "Comment#11"))
          data.+=((18, 6L, "Comment#12"))
          data.+=((19, 6L, "Comment#13"))
          data.+=((20, 6L, "Comment#14"))
          data.+=((21, 6L, "Comment#15"))
      
      
          val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
          tEnv.registerTable("MyTable", t)
      
          tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
      

      Fails with:

      org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM 
      If you think this function should be supported, you can create an issue and start a discussion for it.
      
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
      	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
      	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
      	at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
      	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
      	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
      	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
      	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
      	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
      

      Attachments

        Issue Links

          Activity

            People

              xueyu xueyu
              twalthr Timo Walther
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: