Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5827

Exception when do filter after join a udtf which returns a POJO type

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The test case:

      testFilterUdtfOutputPojo
       
      @Test
        def testFilterUdtfOutputPojo(): Unit = {
          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env)
          val pojoFunc1 = new PojoTableFunc()
          tEnv.registerFunction("pojo1", pojoFunc1)
      
          val result = CollectionDataSets.getSmall3TupleDataSet(env)
            .toTable(tEnv, 'a, 'b, 'c)
            .join(pojoFunc1('c))
            .where(('age > 0) && ('name !== ""))
            .select('a, 'b, 'c, 'age, 'name)
      
          val results = result.toDataSet[Row].collect()
        }
      
      

      It will throw exception:

      org.apache.flink.table.codegen.CodeGenException: No input mapping is specified for input1 of type POJO.
      
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.flink.table.codegen.CodeGenerator.<init>(CodeGenerator.scala:79)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:191)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.generateCollector(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.CommonCorrelate$class.correlateMapFunction(CommonCorrelate.scala:70)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.correlateMapFunction(DataSetCorrelate.scala:37)
        at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.translateToPlan(DataSetCorrelate.scala:101)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:277)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:256)
        at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
        at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
        at org.apache.flink.table.api.scala.stream.table.UserDefinedTableFunctionTest.testFilterUdtfOutputPojo(UserDefinedTableFunctionTest.scala:399)
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3357

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3357
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: ff552b440e3d493b41083b6b63534cfcd83961d9

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: ff552b440e3d493b41083b6b63534cfcd83961d9
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3357

          Thanks @kaibozhou. I will merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3357 Thanks @kaibozhou. I will merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kaibozhou commented on the issue:

          https://github.com/apache/flink/pull/3357

          Hi @twalthr , thank you for the reviewing. I have updated the PR according to your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/3357 Hi @twalthr , thank you for the reviewing. I have updated the PR according to your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kaibozhou commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3357#discussion_r102693470

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala —
          @@ -113,4 +114,26 @@ class TableWithSQLITCase(
          val results = result.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testUDTFWithPojoType(): Unit = {
          — End diff –

          I found there was already a testPojoType case in DataSetUserDefinedFunctionITCase, so I extend it. And I extend testCrossJoin in DataStreamUserDefinedFunctionITCase, do you think it's OK?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/3357#discussion_r102693470 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala — @@ -113,4 +114,26 @@ class TableWithSQLITCase( val results = result.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testUDTFWithPojoType(): Unit = { — End diff – I found there was already a testPojoType case in DataSetUserDefinedFunctionITCase, so I extend it. And I extend testCrossJoin in DataStreamUserDefinedFunctionITCase, do you think it's OK?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kaibozhou commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3357#discussion_r102692495

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala —
          @@ -188,7 +189,12 @@ trait CommonCorrelate {

          getCollector().collect($ {crossResultExpr.resultTerm}

          );

          """.stripMargin
          } else {
          • val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
            + val filterGenerator =
            + udtfTypeInfo match {
            + case pt: PojoTypeInfo[_] => new CodeGenerator(config, false, udtfTypeInfo, None,
              • End diff –

          yes, it's not need to match a pattern here

          Show
          githubbot ASF GitHub Bot added a comment - Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/3357#discussion_r102692495 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala — @@ -188,7 +189,12 @@ trait CommonCorrelate { getCollector().collect($ {crossResultExpr.resultTerm} ); """.stripMargin } else { val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) + val filterGenerator = + udtfTypeInfo match { + case pt: PojoTypeInfo [_] => new CodeGenerator(config, false, udtfTypeInfo, None, End diff – yes, it's not need to match a pattern here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3357#discussion_r102498672

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala —
          @@ -113,4 +114,26 @@ class TableWithSQLITCase(
          val results = result.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testUDTFWithPojoType(): Unit = {
          — End diff –

          We should keep ITCases to a very minimum, because they are very expensive. Espacially, we should not add 4 ITCases for a one line change. I think it is sufficient if we just extend an existing test in `DataSetUserDefinedFunctionITCase` and `DataStreamUserDefinedFunctionITCase`. You could e.g. rename `testLongAndTemporalTypes` to `testDifferentTypes` and add a POJO there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3357#discussion_r102498672 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala — @@ -113,4 +114,26 @@ class TableWithSQLITCase( val results = result.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testUDTFWithPojoType(): Unit = { — End diff – We should keep ITCases to a very minimum, because they are very expensive. Espacially, we should not add 4 ITCases for a one line change. I think it is sufficient if we just extend an existing test in `DataSetUserDefinedFunctionITCase` and `DataStreamUserDefinedFunctionITCase`. You could e.g. rename `testLongAndTemporalTypes` to `testDifferentTypes` and add a POJO there.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3357#discussion_r102497060

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala —
          @@ -188,7 +189,12 @@ trait CommonCorrelate {

          getCollector().collect($ {crossResultExpr.resultTerm}

          );

          """.stripMargin
          } else {
          • val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
            + val filterGenerator =
            + udtfTypeInfo match {
            + case pt: PojoTypeInfo[_] => new CodeGenerator(config, false, udtfTypeInfo, None,
              • End diff –

          I think we don't need a pattern matching here. `pojoFieldMapping` is option anyway so we can pass it directly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3357#discussion_r102497060 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala — @@ -188,7 +189,12 @@ trait CommonCorrelate { getCollector().collect($ {crossResultExpr.resultTerm} ); """.stripMargin } else { val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) + val filterGenerator = + udtfTypeInfo match { + case pt: PojoTypeInfo [_] => new CodeGenerator(config, false, udtfTypeInfo, None, End diff – I think we don't need a pattern matching here. `pojoFieldMapping` is option anyway so we can pass it directly.
          Hide
          kaibo.zhou Kaibo Zhou added a comment -

          Patch is ready but i have no permission assign to myself, could someone give me a contributor permission?

          Thanks a lot.

          Show
          kaibo.zhou Kaibo Zhou added a comment - Patch is ready but i have no permission assign to myself, could someone give me a contributor permission? Thanks a lot.

            People

            • Assignee:
              kaibo.zhou Kaibo Zhou
              Reporter:
              kaibo.zhou Kaibo Zhou
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development