Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21725

DataTypeExtractor extracts wrong fields ordering for Tuple12

    XMLWordPrintableJSON

    Details

      Description

      The following test can reproduce the problem:

       /** Emit Tuple12 result. */
          public static class JavaTableFuncTuple12
                  extends TableFunction<
                          Tuple12<
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  String,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer,
                                  Integer>> {
              private static final long serialVersionUID = -8258882510989374448L;
      
              public void eval(String str) {
                  collect(
                          Tuple12.of(
                                  str + "_a",
                                  str + "_b",
                                  str + "_c",
                                  str + "_d",
                                  str + "_e",
                                  str + "_f",
                                  str.length(),
                                  str.length() + 1,
                                  str.length() + 2,
                                  str.length() + 3,
                                  str.length() + 4,
                                  str.length() + 5));
              }
          }
      
      @Test
        def testCorrelateTuple12(): Unit = {
          val util = streamTestUtil()
          util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          val function = new JavaTableFuncTuple12
          util.addTemporarySystemFunction("func1", function)
          val sql =
            """
              |SELECT *
              |FROM MyTable, LATERAL TABLE(func1(c)) AS T
              |""".stripMargin
      
          util.verifyExecPlan(sql)
        }
      
      // output plan
      Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)], joinType=[INNER])
      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
      

      Note that there is no problem if using the legacy tEnv.registerFunction to register function, becuase it uses TypeInformation. However, it has problem if using tEnv.createTemporaryFunction or CREATE FUNCTION syntax, because it uses TypeInference.

      Note this problem exists in latest 1.11, 1.12, and master branch.

      I think the problem might lay in this line: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562

      because it orders field names by alphabetical.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                twalthr Timo Walther
                Reporter:
                jark Jark Wu
              • Votes:
                1 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: