Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Problem background
We meet data accidental deletion and non deletion issues when synchronizing MySQL cdc data to HBase using HBase connectors.
Reproduction steps
1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkUpsertMaterializer is tunned off by setting table.exec.sink.upsert-materialize = 'NONE'。
MySQL table schema is as follows。
CREATE TABLE `source_sample_1001` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `age` int(11) DEFAULT NULL, `weight` float DEFAULT NULL, PRIMARY KEY (`id`) );
The source table definition in Flink is as follows.
CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '${ip}', 'port' = '3306', 'username' = '${user}', 'password' = '${password}', 'database-name' = 'testdb_0010', 'table-name' = 'source_sample_1001' );
HBase sink table are created in testdb_0011 namespace.
CREATE 'testdb_0011:source_sample_1001', 'data' describe 'testdb_0011:source_sample_1001' # describe output Table testdb_0011:source_sample_1001 is ENABLED testdb_0011:source_sample_1001 COLUMN FAMILIES DESCRIPTION {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
The sink table definition in Flink.
CREATE TABLE `hbase_sink1` ( `id` STRING COMMENT 'unique id', `data` ROW< `name` string, `age` string, `weight` string >, primary key(`id`) not enforced ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'testdb_0011:source_sample_1001', 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' );
DML in flink to synchronize data.
INSERT INTO `hbase_sink1` SELECT `id`, row(`name`, `age`, `weight`) FROM ( SELECT REVERSE(CONCAT_WS('', CAST(id AS VARCHAR ))) as id, `name`, cast(`age` as varchar) as `age`, cast(`weight` as varchar) as `weight` FROM `source_sample_1001` ) t;
2、Another flink job sinks datagen data to the MySQL table source_sample_1001 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。
CREATE TABLE datagen_source ( `id` int, `name` String, `age` int, `weight` int ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '1', 'fields.id.max' = '10000', 'fields.name.length' = '20', 'fields.age.min' = '1', 'fields.age.max' = '150', 'fields.weight.min' = '5', 'fields.weight.max' = '300', 'rows-per-second' = '5000' ); CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name' = 'source_sample_1001', 'username' = '${user}', 'password' = '${password}', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '1s' ); -- dml INSERT INTO `source_sample_1001` SELECT `id`, `name`, `age`, cast(`weight` as float) FROM `datagen_source`;
3、A bash script deletes the MySQL table source_sample_1001 with batch 10.
#!/bin/bash mysql1="mysql -h${ip} -u${user} -p${password}" batch=10 for ((i=1; ;i++)); do echo "iteration $i start" for ((j=1; j<=10000; j+=10)); do $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))" done echo "iteration $i end" sleep 10 done
4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem.
5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.
6、Check the hbase table and reproduce the issue of data loss. As shown below, 67 records were lost in a test.
hbase(main):006:0> count 'testdb_0011:source_sample_1001'
9933 row(s)
Took 0.8724 seconds
=> 9933
Find out a missing record and check the raw data in HBase.
hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24' COLUMN CELL 0 row(s) Took 0.0029 seconds hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '24', STOPROW => '24'} ROW COLUMN+CELL 24 column=data:name, timestamp=2023-05-20T21:17:44.884, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3 24 column=data:name, timestamp=2023-05-20T21:17:43.769, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841 24 column=data:name, timestamp=2023-05-20T21:17:42.902, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3 24 column=data:name, timestamp=2023-05-20T21:17:41.614, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80 24 column=data:name, timestamp=2023-05-20T21:17:40.885, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a 24 column=data:name, timestamp=2023-05-20T21:17:40.841, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7 24 column=data:name, timestamp=2023-05-20T21:17:39.788, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30 24 column=data:name, timestamp=2023-05-20T21:17:35.799, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674 24 column=data:name, timestamp=2023-05-20T21:17:35.688, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a 24 column=data:name, timestamp=2023-05-20T21:17:35.650, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e 24 column=data:name, timestamp=2023-05-20T21:17:34.885, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7 24 column=data:name, timestamp=2023-05-20T21:17:33.905, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700 24 column=data:name, timestamp=2023-05-20T21:17:33.803, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72 24 column=data:name, timestamp=2023-05-20T21:17:33.693, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520 24 column=data:name, timestamp=2023-05-20T21:17:31.784, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098 24 column=data:name, timestamp=2023-05-20T21:17:30.684, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f 24 column=data:name, timestamp=2023-05-20T21:17:29.812, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3 24 column=data:name, timestamp=2023-05-20T21:17:29.590, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e 24 column=data:name, timestamp=2023-05-20T21:17:28.876, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf 24 column=data:name, timestamp=2023-05-20T21:17:28.565, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba 24 column=data:name, timestamp=2023-05-20T21:17:27.879, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab 24 column=data:name, timestamp=2023-05-20T21:17:27.699, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee 24 column=data:name, timestamp=2023-05-20T21:17:24.209, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870 24 column=data:name, timestamp=2023-05-20T21:17:22.480, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:20.716, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:18.678, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:17.720, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:16.858, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:16.682, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:15.753, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:14.571, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:11.572, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:09.681, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:08.792, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:05.888, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:05.754, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:03.626, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:02.652, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:01.790, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:00.986, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:59.797, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.982, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.626, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.149, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:56.610, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:51.655, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:51.458, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:44.860, type=Delete 1 row(s) Took 0.1466 seconds
7、Start the bash script to delete all data of the MySQL table. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.
6、Check the hbase table and reproduce the issue of data no deletion. As shown below, 6 records were not deleted in the test.
hbase(main):012:0> count 'testdb_0011:source_sample_1001'
6 row(s)
Took 0.5121 seconds
=> 6
Check the raw data of a record in HBase.
hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668' COLUMN CELL data:name timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 1 row(s) Took 0.0037 seconds hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'} ROW COLUMN+CELL 3668 column=data:name, timestamp=2023-05-20T21:17:45.728, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3 3668 column=data:name, timestamp=2023-05-20T21:17:44.693, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954 3668 column=data:name, timestamp=2023-05-20T21:17:43.854, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7 3668 column=data:name, timestamp=2023-05-20T21:17:41.721, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a 3668 column=data:name, timestamp=2023-05-20T21:17:40.763, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8 3668 column=data:name, timestamp=2023-05-20T21:17:37.872, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50 3668 column=data:name, timestamp=2023-05-20T21:17:32.573, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170 3668 column=data:name, timestamp=2023-05-20T21:17:26.811, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9 3668 column=data:name, timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 3668 column=data:name, timestamp=2023-05-20T21:17:24.310, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e 3668 column=data:name, timestamp=2023-05-20T21:17:23.508, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2 3668 column=data:name, timestamp=2023-05-20T21:17:22.788, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b 3668 column=data:name, timestamp=2023-05-20T21:17:21.746, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:17.761, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:12.610, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:11.909, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:07.846, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.901, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.758, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.569, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:02.689, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:00.344, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:59.961, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:59.415, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.916, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.718, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.339, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:56.340, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.883, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.683, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.056, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:46.845, type=Delete 1 row(s) Took 0.0457 seconds
Reason for the problem
The HBase connector use the Delete key type without timestamp to delete the latest version of the specified column. This is an expensive call in that on the server-side, it first does a get to find the latest versions timestamp. Then it adds a delete using the fetched cells timestamp. Causing the following issues:
Problem 1: When writing update data, the timestamp of -U and +U added by the hbase server to the update message may be the same, and -U deleted the latest version of +U data, resulting in accidental deletion of the data. The problem was also reported by https://issues.apache.org/jira/browse/FLINK-28910
Problem 2: When there are multiple versions of HBase data, deleting the data will exposes earlier versions of the data, and resulting in the issue of data no deletion.
Solution proposal
Use the DeleteColumn key type and set strongly increasing timestamp for put and delete mutation. The delete mutation will delete all versions of the specified column with a timestamp less than or equal to the specified.
I have test the proposed solution for several days, and neither the data accidental deletion nor no deletion issues happen.
Attachments
Attachments
Issue Links
- supercedes
-
FLINK-28910 CDC From Mysql To Hbase Bugs
- Closed
- links to