Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18461

Changelog source can't be insert into upsert sink

    XMLWordPrintableJSON

Details

    Description

      CREATE TABLE t_pick_order (
            order_no VARCHAR,
            status INT
      ) WITH (
            'connector' = 'kafka',
            'topic' = 'example',
            'scan.startup.mode' = 'latest-offset',
            'properties.bootstrap.servers' = '172.19.78.32:9092',
            'format' = 'canal-json'
      );
      
      CREATE TABLE order_status (
                order_no VARCHAR,
                status INT,
      		  PRIMARY KEY (order_no) NOT ENFORCED
      ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://xxx:3306/flink_test',
                'table-name' = 'order_status',
                'username' = 'dev',
                'password' = 'xxxx'
      );
      
      INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
      

      The above queries throw the following exception:

      [ERROR] Could not execute SQL statement. Reason:
      org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
      Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
      

      It is a bug in planner that we didn't fallback to BEFORE_AND_AFTER trait when ONLY_UPDATE_AFTER can't be satisfied. This results in Changelog source can't be used to written into upsert sink.

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              jark Jark Wu
              Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: