Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23919

PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for Window TVF

    XMLWordPrintableJSON

Details

    Description

      Given the following Window TVF:

      SELECT window_time, 
             MIN(alert_timestamp) as start_time, 
             MAX(alert_timestamp) as end_time 
      FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL '3' MINUTE)) 
      WHERE service_source = 'source' 
      GROUP BY window_start, window_end, window_time
      

      Where the schema of alert_table is:

      alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
      service_source: VARCHAR

      The following generates an invalid RowType:

      Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time), rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#355,distribution=single), rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], size=[3 min]))]Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time), rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE](input=RelSubset#355,distribution=single), rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], size=[3 min]))] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.RuntimeException: Error occurred while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) at org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143) at 
      org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [alert_timestamp] at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86) at org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) at org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) at org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443) at java.base/java.util.HashMap.hash(HashMap.java:339) at java.base/java.util.HashMap.get(HashMap.java:552) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
      

      Looking at the code, it seems that when PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime column from the input row to the new calc without checking to see if there are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime column of the input table is being added to the projected output row like that?

       

      jark would appreciate your help with this.

      Attachments

        1. image-2021-08-23-13-31-24-052.png
          191 kB
          Yuval Itzchakov

        Issue Links

          Activity

            People

              jingzhang Jing Zhang
              Yuval.Itzchakov Yuval Itzchakov
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: