Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23379

interval left join null value result out of order

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.12.2
    • 1.20.0
    • Table SQL / Runtime
    • None

    Description

      • Scenes:
        Person main table left interval join associated message information table,
        The first record that is not associated with the message information table will be later than the later record that is associated with the message information table.
        When there are normal output and null value output with the same primary key, it will be out of order, and the null value output is later than the normal value output, resulting in incorrect results

      enter:

      {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 18:56:43"} {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"} {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 19:06:43"} {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}

      Output:
      +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
      +I(chencc2,28,2021-03-26T19:02:47,null,null)

      The time of the second record here is 19:02 earlier than the first record, but the output of the result is late, causing data update errors

       

      •  code
      tableEnv.executeSql("drop table if exists persons_table_kafka2");
               String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
                       "  `id` BIGINT,\n" +
                       "  `name` STRING,\n" +
                       "  `age` INT,\n" +
                       "  proctime as PROCTIME(),\n" +
                       "  `ts` TIMESTAMP(3),\n" +
                       " WATERMARK FOR ts AS ts\n" +
                       ") WITH (\n" +
                       "  'connector' = 'kafka',\n" +
                       "  'topic' = 'persons_test2',\n" +
                       "  'properties.bootstrap.servers' = 'node2:6667',\n" +
                       "  'properties.group.id' = 'testGroa115',\n" +
                       "  'scan.startup.mode' = 'group-offsets',\n" +
                       "  'format' = 'json'\n" +
                       ")";
               tableEnv.executeSql(kafka_source_sql);
              tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
               String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
                       "  `id` BIGINT,\n" +
                       "  `name` STRING,\n" +
                       "  `message` STRING,\n" +
                       "  `ts` TIMESTAMP(3) ," +
       //                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                       " WATERMARK FOR ts AS ts\n" +
                       ") WITH (\n" +
                       "  'connector' = 'kafka',\n" +
                       "  'topic' = 'persons_extra_message2',\n" +
                       "  'properties.bootstrap.servers' = 'node2:6667',\n" +
                       "  'properties.group.id' = 'testGroud2e313',\n" +
                       "  'scan.startup.mode' = 'group-offsets',\n" +
                       "  'format' = 'json'\n" +
                       ")";
               tableEnv.executeSql(kafka_source_sql2);
              tableEnv.executeSql("" +
                       "CREATE TEMPORARY VIEW result_data_view " +
                       " as " +
                       " select " +
                       " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
                       " from persons_table_kafka2 t1 " +
                       " left  join persons_message_table_kafka2 t2 on t1.name = t2.name and t1.ts between " +
                               " t2.ts and t2.ts +  INTERVAL '3' MINUTE"
                       );
              Table resultTable = tableEnv.from("result_data_view");
               DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable, RowData.class);
               rowDataDataStream.print();
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            waywtdcc waywtdcc
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated: