Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20374

Wrong result when shuffling changelog stream on non-primary-key columns

    XMLWordPrintableJSON

Details

    • Hide
      The 1.13.3 release aims to fix various primary key issues that effectively made it impossible to use this feature. The change might affect savepoint backwards compatibility for those incorrect pipelines. Also the resulting changelog stream might be different after these changes. Pipelines that were correct before should be restorable from a savepoint.
      Show
      The 1.13.3 release aims to fix various primary key issues that effectively made it impossible to use this feature. The change might affect savepoint backwards compatibility for those incorrect pipelines. Also the resulting changelog stream might be different after these changes. Pipelines that were correct before should be restorable from a savepoint.

    Description

      This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html

      CREATE TABLE test (
      `id` INT,
      `name` VARCHAR(255),
      `time` TIMESTAMP(3),
      `status` INT,
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '1',
        'database-name' = 'ai_audio_lyric_task',
        'table-name' = 'test'
      )
      
      CREATE TABLE status (
      `id` INT,
      `name` VARCHAR(255),
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (  
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '1',
        'database-name' = 'ai_audio_lyric_task',
        'table-name' = 'status'
      );
      
      -- output
      CREATE TABLE test_status (
      `id` INT,
      `name` VARCHAR(255),
      `time` TIMESTAMP(3),
      `status` INT,
      `status_name` VARCHAR(255)
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'xxx',
        'index' = 'xxx',
        'username' = 'xxx',
        'password' = 'xxx',
        'sink.bulk-flush.backoff.max-retries' = '100000',
        'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
        'sink.bulk-flush.max-actions' = '5000',
        'sink.bulk-flush.max-size' = '10mb',
        'sink.bulk-flush.interval' = '1s'
      );
      
      INSERT into test_status
      SELECT t.*, s.name
      FROM test AS t
      LEFT JOIN status AS s ON t.status = s.id;
      

      Data in mysql table:

      test:
      0, name0, 2020-07-06 00:00:00 , 0
      1, name1, 2020-07-06 00:00:00 , 1
      2, name2, 2020-07-06 00:00:00 , 1
      .....
      
      status
      0, status0
      1, status1
      2, status2
      .....
      

      Operations:
      1. start job with paralleslim=40, result in test_status sink is correct:

          0, name0, 2020-07-06 00:00:00 , 0, status0
          1, name1, 2020-07-06 00:00:00 , 1, status1
          2, name2, 2020-07-06 00:00:00 , 1, status1
      

      2. Update status of id=2 record in table test from 1 to 2.
      3. Result is not correct because the id=2 record is missing in the result.

      The reason is that it shuffles the changelog test on status column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong.
      The -U[2, name2, 2020-07-06 00:00:00 , 1] and +U[2, name2, 2020-07-06 00:00:00 , 2] will possible be shuffled to different join task, so the order of joined results is not guaranteed when they arrive to the sink task. It is possbile +U[2, name2, 2020-07-06 00:00:00 , status2] arrives first, and then -U[2, name2, 2020-07-06 00:00:00 , status1] , then the id=2 record is missing in Elasticsearch.

      It seems that we need a changelog ordering mechanism in the planner.

      Attachments

        Issue Links

          Activity

            People

              lzljs3620320 Jingsong Lee
              jark Jark Wu
              Votes:
              2 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: