Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
None
-
None
-
None
Description
Problem background
We meet data accidental deletion and non deletion issues when synchronizing MySQL to HBase using MySQL-CDC and HBase connectors.
Reproduction steps
1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkMaterializer is tunned off by setting table.exec.sink.upsert-materialize = 'NONE'。
MySQL table schema is as follows。
CREATE TABLE `source_sample_1001` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(200) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`weight` float DEFAULT NULL,
PRIMARY KEY (`id`)
);
The source table definition in Flink is as follows.
CREATE TABLE `source_sample_1001` (
`id` bigint,
`name` String,
`age` bigint,
`weight` float,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc' ,
'hostname' = '${ip}',
'port' = '3306',
'username' = '${user}',
'password' = '${password}',
'database-name' = 'testdb_0010',
'table-name' = 'source_sample_1001'
);
HBase sink table are created in testdb_0011 namespace.
CREATE 'testdb_0011:source_sample_1001', 'data'
describe 'testdb_0011:source_sample_1001'
- describe output
Table testdb_0011:source_sample_1001 is ENABLED
testdb_0011:source_sample_1001
COLUMN FAMILIES DESCRIPTION {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
The sink table definition in Flink.
CREATE TABLE `hbase_sink1` (
`id` STRING COMMENT 'unique id',
`data` ROW<
`name` string,
`age` bigint,
`weight` float
>,
primary key(`id`) not enforced
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'testdb_0011:source_sample_1001',
'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
);
DML in flink to synchronize data.
INSERT INTO `hbase_sink1` SELECT
REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`,
ROW(`name`, `age`, `weight`)
FROM `source_sample_1001`;
2、Another flink job sink datagen data to the MySQL table source_sample_1001 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。
CREATE TABLE datagen_source (
`id` int,
`name` String,
`age` int,
`weight` float
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.min' = '1',
'fields.id.max' = '10000',
'fields.name.length' = '20',
'rows-per-second' = '5000'
);
CREATE TABLE `source_sample_1001` (
`id` bigint,
`name` String,
`age` bigint,
`weight` float,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'source_sample_1001',
'username' = '${user}',
'password' = '${password}',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '1s'
);
- dml
INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`;
3、A bash script deletes the MySQL table source_sample_1001 with batch 10.
#!/bin/bash
mysql1="mysql -h${ip} -u${user} -p${password}"
batch=10
for ((i=1; ;i++)); do
echo "iteration $i start"
for ((j=1; j<=10000; j+=10)); do
$mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))"
done
echo "iteration $i end"
sleep 10
done
4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem.
- dml
5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.
6、Check the hbase table and reproduce the issue of data loss. As shown below, 67 records of data were lost in a test.
hbase(main):006:0> count 'testdb_0011:source_sample_1001'
9933 row(s)
Took 0.8724 seconds
=> 9933
Find out a missing record of data and check the raw data in HBase.
hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
COLUMN CELL
0 row(s)
Took 0.0029 seconds
hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
ROW COLUMN+CELL
24 column=data:name, timestamp=2023-05-20T21:17:44.884, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3
24 column=data:name, timestamp=2023-05-20T21:17:43.769, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841
24 column=data:name, timestamp=2023-05-20T21:17:42.902, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3
24 column=data:name, timestamp=2023-05-20T21:17:41.614, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80
24 column=data:name, timestamp=2023-05-20T21:17:40.885, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a
24 column=data:name, timestamp=2023-05-20T21:17:40.841, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7
24 column=data:name, timestamp=2023-05-20T21:17:39.788, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30
24 column=data:name, timestamp=2023-05-20T21:17:35.799, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674
24 column=data:name, timestamp=2023-05-20T21:17:35.688, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a
24 column=data:name, timestamp=2023-05-20T21:17:35.650, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e
24 column=data:name, timestamp=2023-05-20T21:17:34.885, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7
24 column=data:name, timestamp=2023-05-20T21:17:33.905, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700
24 column=data:name, timestamp=2023-05-20T21:17:33.803, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72
24 column=data:name, timestamp=2023-05-20T21:17:33.693, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520
24 column=data:name, timestamp=2023-05-20T21:17:31.784, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098
24 column=data:name, timestamp=2023-05-20T21:17:30.684, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f
24 column=data:name, timestamp=2023-05-20T21:17:29.812, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3
24 column=data:name, timestamp=2023-05-20T21:17:29.590, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e
24 column=data:name, timestamp=2023-05-20T21:17:28.876, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf
24 column=data:name, timestamp=2023-05-20T21:17:28.565, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba
24 column=data:name, timestamp=2023-05-20T21:17:27.879, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab
24 column=data:name, timestamp=2023-05-20T21:17:27.699, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee
24 column=data:name, timestamp=2023-05-20T21:17:24.209, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870
24 column=data:name, timestamp=2023-05-20T21:17:22.480, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:20.716, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:18.678, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:17.720, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:16.858, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:16.682, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:15.753, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:14.571, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:11.572, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:09.681, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:08.792, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:05.888, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:05.754, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:03.626, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:02.652, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:01.790, type=Delete
24 column=data:name, timestamp=2023-05-20T21:17:00.986, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:59.797, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:58.982, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:58.626, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:58.149, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:56.610, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:51.655, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:51.458, type=Delete
24 column=data:name, timestamp=2023-05-20T21:16:44.860, type=Delete
1 row(s)
Took 0.1466 seconds
7、Start the bash script to delete all data of the MySQL table. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.
6、Check the hbase table and reproduce the issue of data no delete. As shown below, 6 records of data were not deleted in the test.
hbase(main):012:0> count 'testdb_0011:source_sample_1001'
6 row(s)
Took 0.5121 seconds
=> 6
Check the raw data of a record in HBase.
hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
COLUMN CELL
data:name timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351
1 row(s)
Took 0.0037 seconds
hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
ROW COLUMN+CELL
3668 column=data:name, timestamp=2023-05-20T21:17:45.728, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3
3668 column=data:name, timestamp=2023-05-20T21:17:44.693, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954
3668 column=data:name, timestamp=2023-05-20T21:17:43.854, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7
3668 column=data:name, timestamp=2023-05-20T21:17:41.721, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a
3668 column=data:name, timestamp=2023-05-20T21:17:40.763, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8
3668 column=data:name, timestamp=2023-05-20T21:17:37.872, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50
3668 column=data:name, timestamp=2023-05-20T21:17:32.573, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170
3668 column=data:name, timestamp=2023-05-20T21:17:26.811, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9
3668 column=data:name, timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351
3668 column=data:name, timestamp=2023-05-20T21:17:24.310, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e
3668 column=data:name, timestamp=2023-05-20T21:17:23.508, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2
3668 column=data:name, timestamp=2023-05-20T21:17:22.788, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b
3668 column=data:name, timestamp=2023-05-20T21:17:21.746, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:17.761, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:12.610, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:11.909, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:07.846, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:06.901, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:06.758, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:06.569, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:02.689, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:17:00.344, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:59.961, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:59.415, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:58.916, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:58.718, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:58.339, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:56.340, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:55.883, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:55.683, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:55.056, type=Delete
3668 column=data:name, timestamp=2023-05-20T21:16:46.845, type=Delete
1 row(s)
Took 0.0457 seconds
Reason for the problem
The HBase connector use the Delete key type without timestamp to delete the latest version of the specified column. This is an expensive call in that on the server-side, it first does a get to find the latest versions timestamp. Then it adds a delete using the fetched cells timestamp. Causing the following issues:
Problem 1: When writing update data, the timestamp of -U and +U added by the hbase server to the update message may be the same, and -U deleted the latest version of +U data, resulting in accidental deletion of the data. The problem reported by https://issues.apache.org/jira/browse/FLINK-28910
Problem 2: When there are multiple versions of HBase data, deleting the data will exposes earlier versions of the data, and resulting in the issue of data no delete.
Solution proposal
Use the DeleteColumn key type and set strongly increasing timestamp for put and delete mutation. The delete mutation will delete all versions of the specified column with a timestamp less than or equal to the specified.
I have test the proposed solution for seval days, and neither the data accidental deletion nor non deletion issues happen.