Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.11.3, 1.12.2, 1.13.0
Description
The following test can reproduce the problem:
/** Emit Tuple12 result. */ public static class JavaTableFuncTuple12 extends TableFunction< Tuple12< String, String, String, String, String, String, Integer, Integer, Integer, Integer, Integer, Integer>> { private static final long serialVersionUID = -8258882510989374448L; public void eval(String str) { collect( Tuple12.of( str + "_a", str + "_b", str + "_c", str + "_d", str + "_e", str + "_f", str.length(), str.length() + 1, str.length() + 2, str.length() + 3, str.length() + 4, str.length() + 5)); } }
@Test def testCorrelateTuple12(): Unit = { val util = streamTestUtil() util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new JavaTableFuncTuple12 util.addTemporarySystemFunction("func1", function) val sql = """ |SELECT * |FROM MyTable, LATERAL TABLE(func1(c)) AS T |""".stripMargin util.verifyExecPlan(sql) }
// output plan
Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
Note that there is no problem if using the legacy tEnv.registerFunction to register function, becuase it uses TypeInformation. However, it has problem if using tEnv.createTemporaryFunction or CREATE FUNCTION syntax, because it uses TypeInference.
Note this problem exists in latest 1.11, 1.12, and master branch.
I think the problem might lay in this line: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562
because it orders field names by alphabetical.
Attachments
Issue Links
- links to