Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6101

GroupBy fields with arithmetic expression (include UDF) can not be selected

    XMLWordPrintableJSON

Details

    Description

      currently the TableAPI do not support selecting GroupBy fields with expression either using original field name or the expression

       val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
              .groupBy('e, 'b % 3)
              .select('b, 'c.min, 'e, 'a.avg, 'd.count)
      

      caused

      org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
      

      (BTW, this syntax is invalid in RDBMS which will indicate the selected column is invalid in the select list because it is not contained in either an aggregate function or the GROUP BY clause in SQL Server.)

      and

       val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
              .groupBy('e, 'b % 3)
              .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
      

      will also cause

      org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
      

      and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail.
      and apply an UDF doesn’t work either

         table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 'd.count, 'e.avg)
      
      org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, TMP_1, TMP_2].
      

      the only way to get this work can be

       val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
              .select('a, 'b%3 as 'b, 'c, 'd, 'e)
              .groupBy('e, 'b)
              .select('b, 'c.min, 'e, 'a.avg, 'd.count)
      

      One way to solve this is to add support alias in groupBy clause ( it seems a bit odd against SQL though TableAPI has a different groupBy grammar),
      and I prefer to support select original expressions and UDF in groupBy clause(make consistent with SQL).

      as thus:

      // use expression
      val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
              .groupBy('e, 'b % 3)
              .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
      
      // use UDF
      val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
              .groupBy('e, Mod('b,3))
              .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count)
      

      After had a look into the code, found there was a problem in the groupBy implementation, validation hadn't considered the expressions in groupBy clause. it should be noted that a table has been actually changed after groupBy operation ( a new Table) and the groupBy keys replace the original field reference in essence.

      What do you think?

      Attachments

        Issue Links

          Activity

            People

              lincoln.86xy lincoln lee
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m