Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21725

DataTypeExtractor extracts wrong fields ordering for Tuple12

    XMLWordPrintableJSON

Details

    Description

      The following test can reproduce the problem:

       /** Emit Tuple12 result. */
          public static class JavaTableFuncTuple12
                  extends TableFunction<
                          Tuple12<
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer>> {
              private static final long serialVersionUID = -8258882510989374448L;
      
              public void eval(String str) {
                  collect(
                          Tuple12.of(
                                  str + "_a",
                                  str + "_b",
                                  str + "_c",
                                  str + "_d",
                                  str + "_e",
                                  str + "_f",
                                  str.length(),
                                  str.length() + 1,
                                  str.length() + 2,
                                  str.length() + 3,
                                  str.length() + 4,
                                  str.length() + 5));
              }
          }
      
      @Test
        def testCorrelateTuple12(): Unit = {
          val util = streamTestUtil()
          util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          val function = new JavaTableFuncTuple12
          util.addTemporarySystemFunction("func1", function)
          val sql =
            """
              |SELECT *
              |FROM MyTable, LATERAL TABLE(func1(c)) AS T
              |""".stripMargin
      
          util.verifyExecPlan(sql)
        }
      
      // output plan
      Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)], joinType=[INNER])
      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
      

      Note that there is no problem if using the legacy tEnv.registerFunction to register function, becuase it uses TypeInformation. However, it has problem if using tEnv.createTemporaryFunction or CREATE FUNCTION syntax, because it uses TypeInference.

      Note this problem exists in latest 1.11, 1.12, and master branch.

      I think the problem might lay in this line: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562

      because it orders field names by alphabetical.

      Attachments

        Issue Links

          Activity

            People

              twalthr Timo Walther
              jark Jark Wu
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: