Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21725

DataTypeExtractor extracts wrong fields ordering for Tuple12

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The following test can reproduce the problem:

       /** Emit Tuple12 result. */
          public static class JavaTableFuncTuple12
                  extends TableFunction<
                          Tuple12<
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer>> {
              private static final long serialVersionUID = -8258882510989374448L;
      
              public void eval(String str) {
                  collect(
                          Tuple12.of(
                                  str + "_a",
                                  str + "_b",
                                  str + "_c",
                                  str + "_d",
                                  str + "_e",
                                  str + "_f",
                                  str.length(),
                                  str.length() + 1,
                                  str.length() + 2,
                                  str.length() + 3,
                                  str.length() + 4,
                                  str.length() + 5));
              }
          }
      
      @Test
        def testCorrelateTuple12(): Unit = {
          val util = streamTestUtil()
          util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          val function = new JavaTableFuncTuple12
          util.addTemporarySystemFunction("func1", function)
          val sql =
            """
              |SELECT *
              |FROM MyTable, LATERAL TABLE(func1(c)) AS T
              |""".stripMargin
      
          util.verifyExecPlan(sql)
        }
      
      // output plan
      Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)], joinType=[INNER])
      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
      

      Note that there is no problem if using the legacy tEnv.registerFunction to register function, becuase it uses TypeInformation. However, it has problem if using tEnv.createTemporaryFunction or CREATE FUNCTION syntax, because it uses TypeInference.

      Note this problem exists in latest 1.11, 1.12, and master branch.

      I think the problem might lay in this line: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562

      because it orders field names by alphabetical.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            twalthr Timo Walther
            jark Jark Wu
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment