Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5827

Exception when do filter after join a udtf which returns a POJO type

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.3.0
    • Table SQL / API
    • None

    Description

      The test case:

      testFilterUdtfOutputPojo
       
      @Test
        def testFilterUdtfOutputPojo(): Unit = {
          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env)
          val pojoFunc1 = new PojoTableFunc()
          tEnv.registerFunction("pojo1", pojoFunc1)
      
          val result = CollectionDataSets.getSmall3TupleDataSet(env)
            .toTable(tEnv, 'a, 'b, 'c)
            .join(pojoFunc1('c))
            .where(('age > 0) && ('name !== ""))
            .select('a, 'b, 'c, 'age, 'name)
      
          val results = result.toDataSet[Row].collect()
        }
      
      

      It will throw exception:

      org.apache.flink.table.codegen.CodeGenException: No input mapping is specified for input1 of type POJO.
      
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.flink.table.codegen.CodeGenerator.<init>(CodeGenerator.scala:79)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:191)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.generateCollector(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.correlateMapFunction(CommonCorrelate.scala:70)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.correlateMapFunction(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.translateToPlan(DataSetCorrelate.scala:101)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:277)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:256)
        at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
        at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
        at org.apache.flink.table.api.scala.stream.table.UserDefinedTableFunctionTest.testFilterUdtfOutputPojo(UserDefinedTableFunctionTest.scala:399)
      

      Attachments

        Activity

          People

            kaibo.zhou Kaibo Zhou
            kaibo.zhou Kaibo Zhou
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: