Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22730

Lookup join condition with CURRENT_DATE fails to filter records

    XMLWordPrintableJSON

Details

    Description

      Add the following test case to org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.

      @Test
      def myTest(): Unit = {
        val id1 = TestValuesTableFactory.registerData(
          Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0))))
        val ddl1 =
          s"""
             |CREATE TABLE Ta (
             |  id VARCHAR,
             |  ts TIMESTAMP,
             |  proc AS PROCTIME()
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$id1',
             |  'bounded' = 'true'
             |)
             |""".stripMargin
        tEnv.executeSql(ddl1)
      
        val id2 = TestValuesTableFactory.registerData(
          Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0))))
        val ddl2 =
          s"""
             |CREATE TABLE Tb (
             |  id VARCHAR,
             |  ts TIMESTAMP
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$id2',
             |  'bounded' = 'true'
             |)
             |""".stripMargin
        tEnv.executeSql(ddl2)
      
        val it = tEnv.executeSql(
          """
            |SELECT * FROM Ta AS t1
            |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2
            |ON t1.id = t2.id
            |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00')
            |""".stripMargin).collect()
      
        while (it.hasNext) {
          System.out.println(it.next())
        }
      }
      

      The result is

      +I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
      

      which is obviously incorrect.

      The generated operator is as follows

      public class JoinTableFuncCollector$22 extends org.apache.flink.table.runtime.collector.TableFunctionCollector {
      
          org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
          org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new org.apache.flink.table.data.utils.JoinedRowData();
          private static final java.util.TimeZone timeZone =
                  java.util.TimeZone.getTimeZone("Asia/Shanghai");
          private org.apache.flink.table.data.TimestampData timestamp;
          private org.apache.flink.table.data.TimestampData localTimestamp;
          private int date;
      
          private final org.apache.flink.table.data.binary.BinaryStringData str$17 = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00");
      
      
          public JoinTableFuncCollector$22(Object[] references) throws Exception {
      
          }
      
          @Override
          public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
      
          }
      
          @Override
          public void collect(Object record) throws Exception {
              org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput();
              org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record;
              org.apache.flink.table.data.binary.BinaryStringData field$7;
              boolean isNull$7;
              org.apache.flink.table.data.TimestampData field$8;
              boolean isNull$8;
              org.apache.flink.table.data.TimestampData field$10;
              boolean isNull$10;
              boolean isNull$13;
              org.apache.flink.table.data.binary.BinaryStringData result$14;
              boolean isNull$15;
              org.apache.flink.table.data.binary.BinaryStringData result$16;
              boolean isNull$18;
              org.apache.flink.table.data.binary.BinaryStringData result$19;
              boolean isNull$20;
              boolean result$21;
              isNull$8 = in2.isNullAt(1);
              field$8 = null;
              if (!isNull$8) {
                  field$8 = in2.getTimestamp(1, 6);
              }
              isNull$7 = in2.isNullAt(0);
              field$7 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
              if (!isNull$7) {
                  field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(0));
              }
              isNull$10 = in1.isNullAt(1);
              field$10 = null;
              if (!isNull$10) {
                  field$10 = in1.getTimestamp(1, 6);
              }
      
      
      
              boolean result$11 = !isNull$10;
              org.apache.flink.table.data.TimestampData result$12 = null;
              boolean isNull$12;
              if (result$11) {
      
                  isNull$12 = isNull$10;
                  if (!isNull$12) {
                      result$12 = field$10;
                  }
              }
              else {
      
                  isNull$12 = isNull$8;
                  if (!isNull$12) {
                      result$12 = field$8;
                  }
              }
              isNull$13 = isNull$12;
              result$14 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
              if (!isNull$13) {
      
                  result$14 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12, 6));
                  isNull$13 = (result$14 == null);
              }
      
      
      
      
              isNull$15 = false;
              result$16 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
              if (!isNull$15) {
      
                  result$16 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int) date)));
                  isNull$15 = (result$16 == null);
              }
      
      
              result$19 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) ? null : (result$16), ( false ) ? null : (((org.apache.flink.table.data.binary.BinaryStringData) str$17)));
              isNull$18 = (result$19 == null);
              if (isNull$18) {
                  result$19 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
              }
      
              isNull$20 = isNull$13 || isNull$18;
              result$21 = false;
              if (!isNull$20) {
      
                  result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0;
      
              }
      
              if (result$21) {
      
      
      
      
      
                  if (isNull$7) {
                      out.setField(0, null);
                  } else {
                      out.setField(0, field$7);
                  }
      
      
      
                  if (isNull$8) {
                      out.setField(1, null);
                  } else {
                      out.setField(1, field$8);
                  }
      
      
                  joinedRow$9.replace(in1, out);
                  joinedRow$9.setRowKind(in1.getRowKind());
                  outputResult(joinedRow$9);
      
              }
      
          }
      
          @Override
          public void close() throws Exception {
      
          }
      }
      

      The member variable date is not initialized before use, thus causing this bug.

      This is because LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable forgets to use ${ctx.reusePerRecordCode()} when generating collect method.

      Attachments

        Issue Links

          Activity

            People

              TsReaper Caizhi Weng
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: