Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22154

PushFilterIntoTableSourceScanRule fails to deal with IN expressions

    XMLWordPrintableJSON

Details

    Description

      Add the following test case to PushFilterIntoLegacyTableSourceScanRuleTest to reproduce this bug. PushFilterIntoTableSourceScanRuleTest extends this class and will also be tested.

      @Test
      def myTest(): Unit = {
        util.verifyRelPlan("SELECT * FROM MyTable WHERE name IN ('Alice', 'Bob', 'Dave')")
      }
      

      The exception stack is

      java.lang.AssertionError: OR(OR(=($0, _UTF-16LE'Alice':VARCHAR(5) CHARACTER SET "UTF-16LE"), =($0, _UTF-16LE'Bob':VARCHAR(5) CHARACTER SET "UTF-16LE")), =($0, _UTF-16LE'Dave':VARCHAR(5) CHARACTER SET "UTF-16LE"))
      
      	at org.apache.calcite.rel.core.Filter.<init>(Filter.java:76)
      	at org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
      	at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
      	at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
      	at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
      	at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
      	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
      	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
      	at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
      	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
      	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
      	at scala.collection.immutable.List.foreach(List.scala:392)
      	at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
      	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:281)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyRelPlan(TableTestBase.scala:400)
      	at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRuleTest.myTest(PushFilterIntoLegacyTableSourceScanRuleTest.scala:76)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      

      This is because Filter in calcite requires its condition to be "flat", so we should first simplify the conditions before constructing a filter.

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: