Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31165

Over Agg: The window rank function without order by error in top N query

    XMLWordPrintableJSON

Details

    Description

       

      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      
      val tableEnv = StreamTableEnvironment.create(env)
      
      
      val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
        .option("number-of-rows", "10")
        .schema(Schema
          .newBuilder()
          .column("NAME", DataTypes.VARCHAR(2147483647))
          .column("ROLLNO", DataTypes.DECIMAL(5, 0))
          .column("DOB", DataTypes.DATE())
          .column("CLASS", DataTypes.DECIMAL(2, 0))
          .column("SUBJECT", DataTypes.VARCHAR(2147483647))
          .build())
        .build()
      
      val table = tableEnv.from(td)
      
      
      tableEnv.createTemporaryView("temp_table", table)
      
      val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as date) SRC_NO from temp_table")
      
      tableEnv.createTemporaryView("temp_table2", newTable)
      
      
      val newTable2 = tableEnv.sqlQuery("select * from (select NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  from temp_table2 a) where rownum <= 1")
      
      tableEnv.toChangelogStream(newTable2).print()
      
      env.execute()
       

       

       

      I am getting the below error if I run the above code.

      I have already provided an order by column.

      If I change the order by column to some other column, such as "SUBJECT", then the job runs fine.

       

       

      Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args [rel#245:LogicalWindow.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
          at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
          at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
          at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
          at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
          at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
          at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
          at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
          at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
          at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
          at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
          at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
          at org.example.OverAggregateBug.main(OverAggregateBug.scala)
      Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
          at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
          at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
          at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
          at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
          at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
          at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
          ... 27 more 

       

       

      Attachments

        Activity

          People

            qingyue Jane Chan
            rohankrao P Rohan Kumar
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: