Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
A ValidationException will be thrown out if partition key of Rank is an expression result of input Node. e.g If run the following sql, A validationException will be thrown out.
//代码占位符 @Test def test(): Unit = { val data = List( (2001L, 2L), (2002L, 3L) ) val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 'proctime.proctime) tEnv.registerTable("T", ds) val sql = """ |SELECT | video_id, | cnt, | rownum_2 |FROM |( | SELECT | video_id, | cnt, | ROW_NUMBER() OVER ( | ORDER BY cnt DESC | ) AS rownum_2 | FROM | ( | SELECT | video_id, | cnt, | ROW_NUMBER() OVER ( | PARTITION BY bucket_id | ORDER BY cnt DESC | ) AS rownum_1 | FROM | ( | SELECT | video_id, | cnt, | MOD(video_id, 64) as bucket_id | FROM T | ) | ) | WHERE rownum_1 <= 1000 |) |WHERE rownum_2 <= 1000 |""".stripMargin val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() }
Exception detail
//代码占位符 org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2] at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
When apply `FlinkLogicalRankRuleBase` on `FlinkLogicalCalc`-`FlinkLogicalOverAggregate`, a `FlinkLogicalRank` with following rowType will be get. Then a `StreamExecRank` with same rowType would be generated