Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.12.0, 1.13.0, 1.14.0
Description
Add the following test case to org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.
@Test def myTest(): Unit = { val id1 = TestValuesTableFactory.registerData( Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0)))) val ddl1 = s""" |CREATE TABLE Ta ( | id VARCHAR, | ts TIMESTAMP, | proc AS PROCTIME() |) WITH ( | 'connector' = 'values', | 'data-id' = '$id1', | 'bounded' = 'true' |) |""".stripMargin tEnv.executeSql(ddl1) val id2 = TestValuesTableFactory.registerData( Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0)))) val ddl2 = s""" |CREATE TABLE Tb ( | id VARCHAR, | ts TIMESTAMP |) WITH ( | 'connector' = 'values', | 'data-id' = '$id2', | 'bounded' = 'true' |) |""".stripMargin tEnv.executeSql(ddl2) val it = tEnv.executeSql( """ |SELECT * FROM Ta AS t1 |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2 |ON t1.id = t2.id |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00') |""".stripMargin).collect() while (it.hasNext) { System.out.println(it.next()) } }
The result is
+I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
which is obviously incorrect.
The generated operator is as follows
public class JoinTableFuncCollector$22 extends org.apache.flink.table.runtime.collector.TableFunctionCollector { org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2); org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new org.apache.flink.table.data.utils.JoinedRowData(); private static final java.util.TimeZone timeZone = java.util.TimeZone.getTimeZone("Asia/Shanghai"); private org.apache.flink.table.data.TimestampData timestamp; private org.apache.flink.table.data.TimestampData localTimestamp; private int date; private final org.apache.flink.table.data.binary.BinaryStringData str$17 = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00"); public JoinTableFuncCollector$22(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public void collect(Object record) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput(); org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record; org.apache.flink.table.data.binary.BinaryStringData field$7; boolean isNull$7; org.apache.flink.table.data.TimestampData field$8; boolean isNull$8; org.apache.flink.table.data.TimestampData field$10; boolean isNull$10; boolean isNull$13; org.apache.flink.table.data.binary.BinaryStringData result$14; boolean isNull$15; org.apache.flink.table.data.binary.BinaryStringData result$16; boolean isNull$18; org.apache.flink.table.data.binary.BinaryStringData result$19; boolean isNull$20; boolean result$21; isNull$8 = in2.isNullAt(1); field$8 = null; if (!isNull$8) { field$8 = in2.getTimestamp(1, 6); } isNull$7 = in2.isNullAt(0); field$7 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$7) { field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(0)); } isNull$10 = in1.isNullAt(1); field$10 = null; if (!isNull$10) { field$10 = in1.getTimestamp(1, 6); } boolean result$11 = !isNull$10; org.apache.flink.table.data.TimestampData result$12 = null; boolean isNull$12; if (result$11) { isNull$12 = isNull$10; if (!isNull$12) { result$12 = field$10; } } else { isNull$12 = isNull$8; if (!isNull$12) { result$12 = field$8; } } isNull$13 = isNull$12; result$14 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$13) { result$14 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12, 6)); isNull$13 = (result$14 == null); } isNull$15 = false; result$16 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$15) { result$16 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int) date))); isNull$15 = (result$16 == null); } result$19 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) ? null : (result$16), ( false ) ? null : (((org.apache.flink.table.data.binary.BinaryStringData) str$17))); isNull$18 = (result$19 == null); if (isNull$18) { result$19 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; } isNull$20 = isNull$13 || isNull$18; result$21 = false; if (!isNull$20) { result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0; } if (result$21) { if (isNull$7) { out.setField(0, null); } else { out.setField(0, field$7); } if (isNull$8) { out.setField(1, null); } else { out.setField(1, field$8); } joinedRow$9.replace(in1, out); joinedRow$9.setRowKind(in1.getRowKind()); outputResult(joinedRow$9); } } @Override public void close() throws Exception { } }
The member variable date is not initialized before use, thus causing this bug.
This is because LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable forgets to use ${ctx.reusePerRecordCode()} when generating collect method.
Attachments
Issue Links
- links to