Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25559

SQL JOIN causes data loss

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
    • 1.13.6, 1.14.3, 1.15.0
    • Table SQL / API
    • None

    Description

      //sink table,omits some physical fields
      CREATE TABLE kd_product_info (
        productId BIGINT COMMENT '产品编号',
        productSaleId BIGINT COMMENT '商品编号',
        PRIMARY KEY (productSaleId) NOT ENFORCED
      )
      
      // sql omits some selected fields
      INSERT INTO kd_product_info
      SELECT
       ps.product AS productId,
       ps.productsaleid AS productSaleId,
       CAST(p.complex AS INT) AS complex,
       p.createtime AS createTime,
       p.updatetime AS updateTime,
       p.ean AS ean,
       ts.availablequantity AS totalAvailableStock,
      IF
       (
        ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
        ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
        0
       ) AS sharedStock
       ,rps.purchase AS purchase
       ,v.`name` AS vendorName
       FROM
       product_sale ps
       JOIN product p ON ps.product = p.id
       LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
       LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
       LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
       LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = mc.id
       LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
       LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
       LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale AND pse359.meta = 359
       LEFT JOIN product_image_url piu ON ps.product = piu.product 

      All table sources are upsert-kafka,I have ensured that the associated columns are of the same type:

      CREATE TABLE product_sale (
        id BIGINT COMMENT '主键',
        productsaleid BIGINT COMMENT '商品编号',
        product BIGINT COMMENT '产品编号',
        merchant_id DECIMAL(20, 0) COMMENT '商户id',
        vendor STRING COMMENT '供应商',
        PRIMARY KEY (productsaleid) NOT ENFORCED
      )
      // No computed columns
      // Just plain physical columns
      WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'XXX',
      'group.id' = '%s',
      'properties.bootstrap.servers' = '%s',
      'key.format' = 'json',
      'value.format' = 'json'
      ) 
      
      CREATE TABLE product (
        id BIGINT,
        mccategory STRING,
        PRIMARY KEY (id) NOT ENFORCED
      )
      
      CREATE TABLE rate_product_sale ( 
        id BIGINT COMMENT '主键',
        PRIMARY KEY (id) NOT ENFORCED
      )
      
      CREATE TABLE pss_total_stock (
        id INT COMMENT 'ID',
        productsale BIGINT COMMENT '商品编码',
        PRIMARY KEY (id) NOT ENFORCED
      )
      
      CREATE TABLE vendor (
        merchant_id DECIMAL(20, 0) COMMENT '商户id',
        vendor STRING COMMENT '供应商编码',
        PRIMARY KEY (merchant_id, vendor) NOT ENFORCED
      )
      
      CREATE TABLE mccategory (
        id STRING COMMENT 'mc编号',
        merchant_id DECIMAL(20, 0) COMMENT '商户id',
        PRIMARY KEY (merchant_id, id) NOT ENFORCED
      )
      
      CREATE TABLE new_mccategory (
        mc STRING,
        PRIMARY KEY (mc) NOT ENFORCED
      )
      
      CREATE TABLE product_sale_grade_plus (
        productsale BIGINT,
        PRIMARY KEY (productsale) NOT ENFORCED
      )
      
      CREATE TABLE product_sale_extend (
        id BIGINT,
        product_sale BIGINT,
        meta BIGINT,
        PRIMARY KEY (id) NOT ENFORCED
      )
      
      CREATE TABLE product_image_url(
        product BIGINT,
        PRIMARY KEY (product) NOT ENFORCED 
      )

      the data in each table is between 5 million and 10 million, parallelism: 24;

      Not set ttl; In fact, we can notice data loss as soon as 30 minutes.

       

      The data flow:

      MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink

      I'm sure the ODS data in Kafka is correct;

      I have also tried to use the flink-cdc source directly, it didn't solve the problem;

       

      We tested sinking to kudu, Kafka or ES;

      Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;

      Lost data appears out of order on kafka, guessed as a bug of retraction stream:

       

      After many tests, we found that when the left join table is more or the parallelism of the operator is greater, the data will be more easily lost.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Ashulin Zongwen Li
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: