Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21430

Appear append data when flink sql sink mysql on key conflict

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.12.0
    • None
    • Table SQL / Runtime
    • None

    Description

      kafka data:

      {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 06:39:05.088"} {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 06:47:34.609"}

      kafka ddl :
      CREATE TABLE washroom_detail (
      building_id STRING,
      sofa_id STRING,
      floor_num INT,
      occupy_status INT,
      start_time BIGINT,
      end_time BIGINT,
      process_time TIMESTAMP,
      occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 'HH:mm')),
      local_date as date_format(cast(start_time / 1000 as timestamp), 'yyyy-MM-dd'),
      day_hour as cast(date_format(cast(start_time / 1000 as timestamp), 'HH') as INT) + 8
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'xxxxxxxx',
      'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
      'properties.group.id' = 'xxxxxxxxxxxx',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
      );

      mysql ddl:

      create table hour_ddl
      (
      building_id STRING,
      sofa_id STRING,
      local_date STRING,
      `hour` INT,
      floor_num INT,
      occupy_frequency INT,
      occupy_times STRING,
      update_time TIMESTAMP,
      process_time TIMESTAMP,
      primary key (building_id, sofa_id, local_date, `hour`) NOT ENFORCED
      ) with (
      'connector' = 'jdbc',
      'url' = 'xxxxxxxx',
      'table-name' = 'xxxxxxxx',
      'username' = 'xxxxx'
      'password' = 'xxxxxx'
      )

      flink sql dml:

      INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, occupy_frequency, occupy_times, update_time, process_time)
      SELECT a.building_id,
      a.sofa_id,
      a.local_date,
      a.day_hour,
      a.floor_num,
      CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) AS INT),
      concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
      NOW(),
      a.process_time
      FROM
      (SELECT building_id,
      sofa_id,
      local_date,
      day_hour,
      floor_num,
      count(1) AS frequency,
      LISTAGG(occupy_times) AS times,
      MAX(process_time) AS process_time,
      PROCTIME() AS compute_time
      FROM washroom_detail
      GROUP BY building_id,
      sofa_id,
      local_date,
      day_hour,
      floor_num) a
      LEFT JOIN hour_ddl
      FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
      AND a.sofa_id = b.sofa_id
      AND a.local_date = b.local_date
      AND a.day_hour = b.`hour`
      WHERE a.process_time > b.process_time
      OR b.process_time IS NULL

      appearance:
      when mysql has not this record,insert this record:
      occupy_frequency occupy_times
      1 15:01-15:03
      when key conflict , upsert this record:
      occupy_frequency occupy_times
      3 15:01-15:03,15:01-15:03,15:03-15:04
      result should be the following record:
      occupy_frequency occupy_times
      2 15:01-15:03,15:03-15:04

      Attachments

        Activity

          People

            Unassigned Unassigned
            yuwang0917@gmail.com Yu Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: