Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
None
Description
This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html
CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) CREATE TABLE status ( `id` INT, `name` VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' ); -- output CREATE TABLE test_status ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `status_name` VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '100000', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' ); INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id;
Data in mysql table:
test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 ..... status 0, status0 1, status1 2, status2 .....
Operations:
1. start job with paralleslim=40, result in test_status sink is correct:
0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1
2. Update status of id=2 record in table test from 1 to 2.
3. Result is not correct because the id=2 record is missing in the result.
The reason is that it shuffles the changelog test on status column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong.
The -U[2, name2, 2020-07-06 00:00:00 , 1] and +U[2, name2, 2020-07-06 00:00:00 , 2] will possible be shuffled to different join task, so the order of joined results is not guaranteed when they arrive to the sink task. It is possbile +U[2, name2, 2020-07-06 00:00:00 , status2] arrives first, and then -U[2, name2, 2020-07-06 00:00:00 , status1] , then the id=2 record is missing in Elasticsearch.
It seems that we need a changelog ordering mechanism in the planner.
Attachments
Issue Links
- duplicates
-
FLINK-22826 flink sql1.13.1 causes data loss based on change log stream data join
- Reopened
- is related to
-
FLINK-22899 ValuesUpsertSinkFunction needs to use global upsert
- Closed
-
FLINK-22901 Introduce getChangeLogUpsertKeys in FlinkRelMetadataQuery
- Closed
-
FLINK-23054 Correct upsert optimization by upsert keys
- Closed
-
FLINK-23350 Write doc for change log disorder by special operators
- Open
- relates to
-
FLINK-23835 Test upsert sink with upsert keys
- Closed