Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24835

"group by" in the interval join will throw a exception

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Can reproduce this bug by the following code added into IntervalJoinTest.scala:

      @Test
      def testSemiIntervalJoinWithSimpleConditionAndGroup(): Unit = {
        val sql =
          """
            |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
            | SELECT t2.a FROM MyTable2 t2
            |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
            |   GROUP BY t2.a
            |)
          """.stripMargin
        util.verifyExecPlan(sql)
      } 

      The exception is :

      java.lang.IllegalStateException
          at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
          at org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:64)
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
          at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
          at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
          at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
          at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
          at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
          at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
          at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:64)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.Iterator.foreach(Iterator.scala:937)
          at scala.collection.Iterator.foreach$(Iterator.scala:937)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
          at scala.collection.IterableLike.foreach(IterableLike.scala:70)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:60)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:165)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
          at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
          at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:309)
          at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:894)
          at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:790)
          at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:591)
          at org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest.testSemiIntervalJoinWithSimpleConditionAndGroup(IntervalJoinTest.scala:76) 

      It is caused by that the agg casts the time indicator type to the common timestamp.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jingzhang Jing Zhang
            xuyangzhong xuyang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Issue deployment