Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
-
None
Description
//sink table,omits some physical fields CREATE TABLE kd_product_info ( productId BIGINT COMMENT '产品编号', productSaleId BIGINT COMMENT '商品编号', PRIMARY KEY (productSaleId) NOT ENFORCED ) // sql omits some selected fields INSERT INTO kd_product_info SELECT ps.product AS productId, ps.productsaleid AS productSaleId, CAST(p.complex AS INT) AS complex, p.createtime AS createTime, p.updatetime AS updateTime, p.ean AS ean, ts.availablequantity AS totalAvailableStock, IF ( ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0, ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity, 0 ) AS sharedStock ,rps.purchase AS purchase ,v.`name` AS vendorName FROM product_sale ps JOIN product p ON ps.product = p.id LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = mc.id LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale AND pse359.meta = 359 LEFT JOIN product_image_url piu ON ps.product = piu.product
All table sources are upsert-kafka,I have ensured that the associated columns are of the same type:
CREATE TABLE product_sale ( id BIGINT COMMENT '主键', productsaleid BIGINT COMMENT '商品编号', product BIGINT COMMENT '产品编号', merchant_id DECIMAL(20, 0) COMMENT '商户id', vendor STRING COMMENT '供应商', PRIMARY KEY (productsaleid) NOT ENFORCED ) // No computed columns // Just plain physical columns WITH ( 'connector' = 'upsert-kafka', 'topic' = 'XXX', 'group.id' = '%s', 'properties.bootstrap.servers' = '%s', 'key.format' = 'json', 'value.format' = 'json' ) CREATE TABLE product ( id BIGINT, mccategory STRING, PRIMARY KEY (id) NOT ENFORCED ) CREATE TABLE rate_product_sale ( id BIGINT COMMENT '主键', PRIMARY KEY (id) NOT ENFORCED ) CREATE TABLE pss_total_stock ( id INT COMMENT 'ID', productsale BIGINT COMMENT '商品编码', PRIMARY KEY (id) NOT ENFORCED ) CREATE TABLE vendor ( merchant_id DECIMAL(20, 0) COMMENT '商户id', vendor STRING COMMENT '供应商编码', PRIMARY KEY (merchant_id, vendor) NOT ENFORCED ) CREATE TABLE mccategory ( id STRING COMMENT 'mc编号', merchant_id DECIMAL(20, 0) COMMENT '商户id', PRIMARY KEY (merchant_id, id) NOT ENFORCED ) CREATE TABLE new_mccategory ( mc STRING, PRIMARY KEY (mc) NOT ENFORCED ) CREATE TABLE product_sale_grade_plus ( productsale BIGINT, PRIMARY KEY (productsale) NOT ENFORCED ) CREATE TABLE product_sale_extend ( id BIGINT, product_sale BIGINT, meta BIGINT, PRIMARY KEY (id) NOT ENFORCED ) CREATE TABLE product_image_url( product BIGINT, PRIMARY KEY (product) NOT ENFORCED )
the data in each table is between 5 million and 10 million, parallelism: 24;
Not set ttl; In fact, we can notice data loss as soon as 30 minutes.
The data flow:
MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink
I'm sure the ODS data in Kafka is correct;
I have also tried to use the flink-cdc source directly, it didn't solve the problem;
We tested sinking to kudu, Kafka or ES;
Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
Lost data appears out of order on kafka, guessed as a bug of retraction stream:
After many tests, we found that when the left join table is more or the parallelism of the operator is greater, the data will be more easily lost.
Attachments
Attachments
Issue Links
- duplicates
-
FLINK-22826 flink sql1.13.1 causes data loss based on change log stream data join
- Reopened
- is fixed by
-
FLINK-20370 Result is wrong when sink primary key is not the same with query
- Closed