Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25592 Improvement of parser, optimizer and execution for Flink Batch SQL
  3. FLINK-25095

Case when would be translated into different expression in Hive dialect and default dialect

    XMLWordPrintableJSON

Details

    Description

      When we use blink planner's batch mode and set hive dialect. This exception will be reported when the subquery field is used in case when.

      org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of function's argument data type 'BOOLEAN NOT NULL' and actual argument type 'BOOLEAN'.    at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:326)
          at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:323)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
          at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyArgumentTypes(BridgingFunctionGenUtil.scala:323)
          at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:98)
          at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:65)
          at org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:73)
          at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:811)
          at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:501)
          at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
          at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
          at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:152)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:152)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
          at scala.collection.AbstractTraversable.map(Traversable.scala:104)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:177)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:50)
          at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
          at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:95)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
          at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:84)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
          at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.java:103)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
          at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:84)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
          at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
          at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
          at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:83)
          at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:82)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
          at scala.collection.Iterator$class.foreach(Iterator.scala:891)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
          at scala.collection.AbstractTraversable.map(Traversable.scala:104)
          at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:82)
          at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1524)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:797)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1233)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:734)
          at org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase.testCodeGenFunctionArgumentType(TableEnvHiveConnectorITCase.java:175)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
          at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
          at org.junit.rules.RunRules.evaluate(RunRules.java:20)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
      Process finished with exit code 255
       

      This problem can be reproduced by adding a unit test TableEnvHiveConnectorITCase#testCodeGenFunctionArgumentType:

      @Test
      public void testCodeGenFunctionArgumentType() throws Exception {
          TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
          tableEnv.loadModule("hive", new HiveModule());
          tableEnv.useModules("hive", "core");
      
          tableEnv.executeSql("create database db1");
          try {
              tableEnv.useDatabase("db1");
              tableEnv.executeSql("create table src1(key string, val string)");
              HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
                      .addRow(new Object[] {"1", "val1"})
                      .addRow(new Object[] {"2", "val2"})
                      .addRow(new Object[] {"3", "val3"})
                      .commit();
      
              List<Row> results =
                      CollectionUtil.iteratorToList(
                              tableEnv.executeSql(
                                              "select t.key, count(case when t.num=1 then 1 else null end) from "
                                                      + "(select key,count(case when key='1' then 1 else null end) as num from src1 group by key,val) t "
                                                      + "group by t.key")
                                      .collect());
              assertEquals("[+I[1, 1], +I[2, 0], +I[3, 0]]", results.toString());
          } finally {
              tableEnv.useDatabase("default");
              tableEnv.executeSql("drop database db1 cascade");
          }
      } 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xiangqiao xiangqiao
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: