Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.15.1
-
-
Important
Description
The following code can reproduce the case
public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource<Tuple3<Integer, String, Instant>> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource<Tuple2<Integer, Instant>> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } }
The result of temporal join is the following:
op | id | ts | id0 | state | ts0 | row_num |
+I | 0 | 1970-01-01 08:00:00.000 | 0 | online | 1970-01-01 08:00:00.000 | 1 |
+I | 0 | 1970-01-01 08:00:00.005 | 0 | online | 1970-01-01 08:00:00.000 | 1 |
+I | 0 | 1970-01-01 08:00:00.010 | 0 | offline | 1970-01-01 08:00:00.010 | 1 |
+I | 0 | 1970-01-01 08:00:00.015 | 0 | offline | 1970-01-01 08:00:00.010 | 1 |
+I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 |
+I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 |
After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result:
op | id | ts | id0 | state | ts0 | row_num |
+I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 |
+I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 |