Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5827

Exception when do filter after join a udtf which returns a POJO type

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The test case:

      testFilterUdtfOutputPojo
       
      @Test
        def testFilterUdtfOutputPojo(): Unit = {
          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env)
          val pojoFunc1 = new PojoTableFunc()
          tEnv.registerFunction("pojo1", pojoFunc1)
      
          val result = CollectionDataSets.getSmall3TupleDataSet(env)
            .toTable(tEnv, 'a, 'b, 'c)
            .join(pojoFunc1('c))
            .where(('age > 0) && ('name !== ""))
            .select('a, 'b, 'c, 'age, 'name)
      
          val results = result.toDataSet[Row].collect()
        }
      
      

      It will throw exception:

      org.apache.flink.table.codegen.CodeGenException: No input mapping is specified for input1 of type POJO.
      
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.flink.table.codegen.CodeGenerator.<init>(CodeGenerator.scala:79)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:191)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.generateCollector(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.correlateMapFunction(CommonCorrelate.scala:70)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.correlateMapFunction(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.translateToPlan(DataSetCorrelate.scala:101)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:277)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:256)
        at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
        at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
        at org.apache.flink.table.api.scala.stream.table.UserDefinedTableFunctionTest.testFilterUdtfOutputPojo(UserDefinedTableFunctionTest.scala:399)
      

        Attachments

          Activity

            People

            • Assignee:
              kaibo.zhou kaibo.zhou
              Reporter:
              kaibo.zhou kaibo.zhou
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: