Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25097

Bug in inner join when the filter condition is boolean type

    XMLWordPrintableJSON

Details

    Description

      When I test the inner join, the column type of the filter condition is Boolean, and there is an error in the SQL conversion process。
      The SQL as follow:

      source-1:
      "CREATE TABLE IF NOT EXISTS data_source (\n" +
      " id INT,\n" +
      " name STRING,\n" +
      " sex boolean\n" +
      ") WITH (\n" +
      " 'connector' = 'datagen',\n" +
      " 'rows-per-second'='1',\n" +
      " 'fields.id.kind'='sequence',\n" +
      " 'fields.id.start'='1',\n" +
      " 'fields.id.end'='10',\n" +
      " 'fields.name.kind'='random',\n" +
      " 'fields.name.length'='10'\n" +
      ")";   
      source-2:
      "CREATE TABLE IF NOT EXISTS info (\n" +
      " id INT,\n" +
      " name STRING,\n" +
      " sex boolean\n" +
      ") WITH (\n" +
      " 'connector' = 'datagen',\n" +
      " 'rows-per-second'='1',\n" +
      " 'fields.id.kind'='sequence',\n" +
      " 'fields.id.start'='1',\n" +
      " 'fields.id.end'='10',\n" +
      " 'fields.name.kind'='random',\n" +
      " 'fields.name.length'='10'\n" +
      ")";   
      sink:
      "CREATE TABLE IF NOT EXISTS print_sink ( \n" +
      " id INT,\n" +
      " name STRING,\n" +
      " left_sex boolean,\n" +
      " right_sex boolean\n" +
      ") WITH (\n" +
      " 'connector' = 'print'\n" +
      ")";    
      
      SQL-1:
      "insert into print_sink" +
      " select l.id, l.name, l.sex, r.sex from data_source l " +
      "inner join info r on l.sex = r.sex where l.sex is true";

      The SQL fails with:

      The program finished with the following exception:
      org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while applying rule FlinkLogicalCalcConverter(in:NONE,out:LOGICAL), args [rel#135:LogicalCalc.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#115,expr#0..5={inputs},proj#0..2={exprs},3=$t5)]
              at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
              at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
              at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
              at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
              at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
              at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
              at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
              at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
              at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
      Caused by: java.lang.RuntimeException: Error while applying rule FlinkLogicalCalcConverter(in:NONE,out:LOGICAL), args [rel#135:LogicalCalc.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#115,expr#0..5={inputs},proj#0..2={exprs},3=$t5)]
              at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
              at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
              at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
              at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
              at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
              at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
              at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
              at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
              at scala.collection.Iterator$class.foreach(Iterator.scala:891)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
              at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
              at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
              at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
              at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
              at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
              at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
              at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
              at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100)
              at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
              at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:625)
              at com.xue.testSql.main(testSql.java:60)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
              ... 11 more
      Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalCalcConverter(in:NONE,out:LOGICAL)
              at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
              at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
              at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
              at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:169)
              at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
              ... 39 more
      Caused by: java.lang.ClassCastException: org.apache.calcite.rex.RexInputRef cannot be cast to org.apache.calcite.rex.RexCall
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$.org$apache$flink$table$planner$plan$utils$ColumnIntervalUtil$$columnIntervalOfSinglePredicate(ColumnIntervalUtil.scala:236)
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5$$anonfun$6.apply(ColumnIntervalUtil.scala:223)
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5$$anonfun$6.apply(ColumnIntervalUtil.scala:223)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.Iterator$class.foreach(Iterator.scala:891)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
              at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.AbstractTraversable.map(Traversable.scala:104)
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5.apply(ColumnIntervalUtil.scala:223)
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$$anonfun$5.apply(ColumnIntervalUtil.scala:221)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.Iterator$class.foreach(Iterator.scala:891)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
              at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.AbstractTraversable.map(Traversable.scala:104)
              at org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil$.getColumnIntervalWithFilter(ColumnIntervalUtil.scala:221)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnIntervalOfCalc(FlinkRelMdColumnInterval.scala:227)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:203)
              at GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown Source)
              at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown Source)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:112)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:801)
              at GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown Source)
              at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown Source)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:112)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount$$anonfun$1.apply(FlinkRelMdRowCount.scala:308)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount$$anonfun$1.apply(FlinkRelMdRowCount.scala:306)
              at scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
              at scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:46)
              at scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:48)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getEquiInnerJoinRowCount(FlinkRelMdRowCount.scala:306)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:268)
              at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source)
              at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source)
              at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:410)
              at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source)
              at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source)
              at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212)
              at org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:766)
              at org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:761)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdRowCount.getRowCount(FlinkRelMdRowCount.scala:62)
              at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source)
              at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source)
              at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:212)
              at org.apache.flink.table.planner.plan.nodes.common.CommonCalc.computeSelfCost(CommonCalc.scala:59)
              at org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
              at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
              at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
              at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:288)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:705)
              at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:415)
              at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:398)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1268)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1227)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
              at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
              at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
              ... 43 more
      

      I change the flink version,such as  1.12.2 ,1.13.3 and 1.14.0,this error occur in all versions during executeSql。

      There is a little different in 1.12.2  between other version.The above errors will be reported directly when explain SQL-1  in 1.12.2,but other version explain SQL-1 successfully. 

      Then,I modify the SQL-1 .Change l.sex from true to false

      SQL-2:
      insert into print_sink select l.id, l.name, l.sex, r.sex from data_source l inner join info r on l.sex = r.sex where l.sex is false

      The SQL-2 can run normally. 

       

      I attempt to modify org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil#columnIntervalOfSinglePredicate,like this

      private def columnIntervalOfSinglePredicate(condition: RexNode): ValueInterval = {
      //Add a judgment
      if ( !condition.isInstanceOf[RexCall] ){
          return null
      }
      val convertedCondition = condition.asInstanceOf[RexCall]
      ...
      }

       Both SQL-1 and SQL-2 run normally.Result are ConditionTrueResult.txt and ConditionFalseResult.txt.

       

       

      Attachments

        1. ColumnIntervalUtil.scala
          12 kB
          Chu Xue
        2. ConditionFalseResult.txt
          0.9 kB
          Chu Xue
        3. ConditionTrueResult.txt
          0.7 kB
          Chu Xue
        4. errorLog.txt
          11 kB
          Chu Xue

        Activity

          People

            337361684@qq.com Yunhong Zheng
            xuechu Chu Xue
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: