Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28988

Incorrect result for filter after temporal join

    XMLWordPrintableJSON

Details

    • Hide
      After FLINK-28988 applied, the filter will not be pushed down into both inputs of the event time temporal join.
      Note this may cause incompatible plan changes compare to 1.16.0, e.g., when left input is an upsert source(use upsert-kafka connector), the query plan will remove the ChangelogNormalize node from which appeared in 1.16.0.
      Show
      After FLINK-28988 applied, the filter will not be pushed down into both inputs of the event time temporal join. Note this may cause incompatible plan changes compare to 1.16.0, e.g., when left input is an upsert source(use upsert-kafka connector), the query plan will remove the ChangelogNormalize node from which appeared in 1.16.0.
    • Important

    Description

      The following code can reproduce the case

       

      public class TemporalJoinSQLExample1 {
      
          public static void main(String[] args) throws Exception {
      
              // set up the Java DataStream API
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // set up the Java Table API
              final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      
              final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
                      env.fromElements(
                              new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
                              new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
                              new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
      
              final Table table =
                      tableEnv.fromDataStream(
                                      ds,
                                      Schema.newBuilder()
                                              .column("f0", DataTypes.INT())
                                              .column("f1", DataTypes.STRING())
                                              .column("f2", DataTypes.TIMESTAMP_LTZ(3))
                                              .watermark("f2", "f2 - INTERVAL '2' SECONDS")
                                              .build())
                              .as("id", "state", "ts");
              tableEnv.createTemporaryView("source_table", table);
              final Table dedupeTable =
                      tableEnv.sqlQuery(
                              "SELECT * FROM ("
                                      + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table"
                                      + ") WHERE row_num = 1");
              tableEnv.createTemporaryView("versioned_table", dedupeTable);
      
              DataStreamSource<Tuple2<Integer, Instant>> event =
                      env.fromElements(
                              new Tuple2<>(0, Instant.ofEpochMilli(0)),
                              new Tuple2<>(0, Instant.ofEpochMilli(5)),
                              new Tuple2<>(0, Instant.ofEpochMilli(10)),
                              new Tuple2<>(0, Instant.ofEpochMilli(15)),
                              new Tuple2<>(0, Instant.ofEpochMilli(20)),
                              new Tuple2<>(0, Instant.ofEpochMilli(25)));
      
              final Table eventTable =
                      tableEnv.fromDataStream(
                                      event,
                                      Schema.newBuilder()
                                              .column("f0", DataTypes.INT())
                                              .column("f1", DataTypes.TIMESTAMP_LTZ(3))
                                              .watermark("f1", "f1 - INTERVAL '2' SECONDS")
                                              .build())
                              .as("id", "ts");
      
              tableEnv.createTemporaryView("event_table", eventTable);
      
              final Table result =
                      tableEnv.sqlQuery(
                              "SELECT * FROM event_table"
                                      + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts"
                                      + " ON event_table.id = versioned_table.id");
              result.execute().print();
      
              result.filter($("state").isEqual("online")).execute().print();
          }
      } 

       

      The result of temporal join is the following:

      op          id                      ts         id0                          state                     ts0              row_num
      +I           0 1970-01-01 08:00:00.000           0                         online 1970-01-01 08:00:00.000                    1
      +I           0 1970-01-01 08:00:00.005           0                         online 1970-01-01 08:00:00.000                    1
      +I           0 1970-01-01 08:00:00.010           0                        offline 1970-01-01 08:00:00.010                    1
      +I           0 1970-01-01 08:00:00.015           0                        offline 1970-01-01 08:00:00.010                    1
      +I           0 1970-01-01 08:00:00.020           0                         online 1970-01-01 08:00:00.020                    1
      +I           0 1970-01-01 08:00:00.025           0                         online 1970-01-01 08:00:00.020                    1

       

      After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result:

      op          id                      ts         id0                          state                     ts0              row_num
      +I           0 1970-01-01 08:00:00.020           0                         online 1970-01-01 08:00:00.020                    1
      +I           0 1970-01-01 08:00:00.025           0                         online 1970-01-01 08:00:00.020                    1

       
       
       

       

      Attachments

        Activity

          People

            csq Shuiqiang Chen
            xuannan Xuannan Su
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: